You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/21 16:28:31 UTC

[iotdb] branch master updated: [IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fd145d6  [IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)
fd145d6 is described below

commit fd145d6e5a8f77d1212e1d15759c5bcfcfa7ab82
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Mon Nov 22 00:27:59 2021 +0800

    [IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)
    
    At present, MtqqService contains some duplicate code, which can be accesses to BasicServiceProvider, so as to avoid repeatedly writing code.
    
    In addition, MqttService does not have a perfect authentication mechanism. At present, there is only a user login mechanism, but users do not necessarily have write permissions on all paths. If we access BasicServiceProvider , we can improve the authentication function.
    
    see: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336794
    
    same pr: https://issues.apache.org/jira/browse/IOTDB-2036
---
 .../org/apache/iotdb/db/mqtt/PublishHandler.java   | 68 +++++++++++++-------
 .../db/service/basic/BasicServiceProvider.java     |  6 +-
 .../apache/iotdb/db/mqtt/PublishHandlerTest.java   | 75 +++++++++++++++++++---
 3 files changed, 114 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index ea4ae86..a3d9069 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -18,50 +18,78 @@
 package org.apache.iotdb.db.mqtt;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptConnectMessage;
+import io.moquette.interception.messages.InterceptDisconnectMessage;
 import io.moquette.interception.messages.InterceptPublishMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZoneId;
 import java.util.List;
 
 /** PublishHandler handle the messages from MQTT clients. */
 public class PublishHandler extends AbstractInterceptHandler {
 
+  private final BasicServiceProvider basicServiceProvider;
+  private long sessionId;
+
   private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
 
-  private IPlanExecutor executor;
-  private PayloadFormatter payloadFormat;
+  private final PayloadFormatter payloadFormat;
 
   public PublishHandler(IoTDBConfig config) {
     this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
     try {
-      this.executor = new PlanExecutor();
+      this.basicServiceProvider = new BasicServiceProvider();
     } catch (QueryProcessException e) {
       throw new RuntimeException(e);
     }
   }
 
-  protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
-    this.executor = executor;
+  protected PublishHandler(PayloadFormatter payloadFormat) {
+    try {
+      this.basicServiceProvider = new BasicServiceProvider();
+    } catch (QueryProcessException e) {
+      throw new RuntimeException(e);
+    }
     this.payloadFormat = payloadFormat;
   }
 
   @Override
   public String getID() {
-    return "iotdb-mqtt-broker-listener";
+    return "iotdb-mqtt-broker-listener-" + sessionId;
+  }
+
+  @Override
+  public void onConnect(InterceptConnectMessage msg) {
+    try {
+      BasicOpenSessionResp basicOpenSessionResp =
+          basicServiceProvider.openSession(
+              msg.getUsername(),
+              new String(msg.getPassword()),
+              ZoneId.systemDefault().toString(),
+              TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+      sessionId = basicOpenSessionResp.getSessionId();
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onDisconnect(InterceptDisconnectMessage msg) {
+    basicServiceProvider.closeSession(sessionId);
   }
 
   @Override
@@ -101,7 +129,12 @@ public class PublishHandler extends AbstractInterceptHandler {
                 event.getTimestamp(),
                 event.getMeasurements().toArray(new String[0]),
                 event.getValues().toArray(new String[0]));
-        status = executeNonQuery(plan);
+        TSStatus tsStatus = basicServiceProvider.checkAuthority(plan, sessionId);
+        if (tsStatus != null) {
+          LOG.warn(tsStatus.message);
+        } else {
+          status = basicServiceProvider.executeNonQuery(plan);
+        }
       } catch (Exception e) {
         LOG.warn(
             "meet error when inserting device {}, measurements {}, at time {}, because ",
@@ -114,13 +147,4 @@ public class PublishHandler extends AbstractInterceptHandler {
       LOG.debug("event process result: {}", status);
     }
   }
-
-  private boolean executeNonQuery(PhysicalPlan plan)
-      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
-    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
-      throw new QueryProcessException(
-          "Current system mode is read-only, does not support non-query operation");
-    }
-    return executor.processNonQuery(plan);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 693e99e..e11db24 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -111,7 +111,7 @@ public class BasicServiceProvider {
     return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
   }
 
-  protected TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+  public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
     List<? extends PartialPath> paths = plan.getPaths();
     try {
       if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
@@ -129,7 +129,7 @@ public class BasicServiceProvider {
     return null;
   }
 
-  protected BasicOpenSessionResp openSession(
+  public BasicOpenSessionResp openSession(
       String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
       throws TException {
     BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
@@ -183,7 +183,7 @@ public class BasicServiceProvider {
     return openSessionResp.sessionId(sessionId);
   }
 
-  protected boolean closeSession(long sessionId) {
+  public boolean closeSession(long sessionId) {
     AUDIT_LOGGER.info("Session-{} is closing", sessionId);
 
     sessionManager.removeCurrSessionId();
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
index 7a70b92..672a41d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
@@ -17,28 +17,43 @@
  */
 package org.apache.iotdb.db.mqtt;
 
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
 
+import io.moquette.interception.messages.InterceptConnectMessage;
+import io.moquette.interception.messages.InterceptDisconnectMessage;
 import io.moquette.interception.messages.InterceptPublishMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.mqtt.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
+import java.sql.*;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class PublishHandlerTest {
 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
   @Test
-  public void onPublish() throws Exception {
-    IPlanExecutor executor = mock(IPlanExecutor.class);
+  public void onPublish() throws ClassNotFoundException {
     PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
-    PublishHandler handler = new PublishHandler(executor, payloadFormat);
+    PublishHandler handler = new PublishHandler(payloadFormat);
 
     String payload =
         "{\n"
@@ -50,13 +65,53 @@ public class PublishHandlerTest {
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
 
+    // connect
+    MqttConnectPayload mqttConnectPayload =
+        new MqttConnectPayload(null, null, "test", "root", "root");
+    MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, mqttConnectPayload);
+    InterceptConnectMessage interceptConnectMessage =
+        new InterceptConnectMessage(mqttConnectMessage);
+    handler.onConnect(interceptConnectMessage);
+
+    // publish
     MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
     MqttFixedHeader fixedHeader =
         new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
-
     MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
     InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null);
     handler.onPublish(message);
-    verify(executor).processNonQuery(any(InsertRowPlan.class));
+
+    // disconnect
+    InterceptDisconnectMessage interceptDisconnectMessage =
+        new InterceptDisconnectMessage(null, null);
+    handler.onDisconnect(interceptDisconnectMessage);
+
+    String[] retArray = new String[] {"1586076045524,0.530635,"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute("select * from root.sg.d1");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+            builder.append(resultSet.getString(i)).append(",");
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
   }
 }