You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/14 15:33:48 UTC

[iotdb] branch master updated: [IOTDB-2913] Add InfluxDB Protocol Test Framework (#5546)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d808d844d6 [IOTDB-2913] Add InfluxDB Protocol Test Framework (#5546)
d808d844d6 is described below

commit d808d844d6ce28890ede3ddc53cd9401e6e46c01
Author: 刘威 <51...@users.noreply.github.com>
AuthorDate: Thu Apr 14 23:33:43 2022 +0800

    [IOTDB-2913] Add InfluxDB Protocol Test Framework (#5546)
    
    Co-authored-by: xieqijun <xi...@kuaishou.com>
---
 .github/workflows/influxdb-protocol.yml            |  4 +-
 .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4    |  1 +
 docker/src/main/Dockerfile-single-influxdb         | 46 ++++++++++++++++++++++
 docs/UserGuide/API/InfluxDB-Protocol.md            |  5 ++-
 docs/zh/UserGuide/API/InfluxDB-Protocol.md         |  4 +-
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |  1 +
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  | 20 +++-------
 .../influxdb/integration/IoTDBInfluxDBIT.java      | 34 ++++++++--------
 .../db/protocol/influxdb/handler/QueryHandler.java |  6 +--
 .../influxdb/meta/InfluxDBMetaManager.java         | 13 +++++-
 .../protocol/influxdb/util/QueryResultUtils.java   |  3 +-
 11 files changed, 94 insertions(+), 43 deletions(-)

diff --git a/.github/workflows/influxdb-protocol.yml b/.github/workflows/influxdb-protocol.yml
index e13e83d5b6..4879944208 100644
--- a/.github/workflows/influxdb-protocol.yml
+++ b/.github/workflows/influxdb-protocol.yml
@@ -61,10 +61,10 @@ jobs:
 
       - name: Build Docker Image
         run: |
-          docker build . -f docker/src/main/Dockerfile-single -t "apache/iotdb:maven-development"
+          docker build . -f docker/src/main/Dockerfile-single-influxdb -t "apache/iotdb:influxdb-protocol-on"
           docker images
 
       - name: IT Test
         shell: bash
         run: |
-          cd influxdb-protocol && mvn -B clean compile post-integration-test -Dtest.port.closed=true
+          cd influxdb-protocol && mvn -B clean compile post-integration-test -Dtest.port.closed=true -Dinfluxdb.test.skip=false
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4
index db06fcd798..ad29971e89 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4
@@ -76,6 +76,7 @@ nodeName
     | QUTOED_ID_IN_NODE_NAME
     | LAST
     | COUNT
+    | DEVICE
     ;
 
 // Identifier
diff --git a/docker/src/main/Dockerfile-single-influxdb b/docker/src/main/Dockerfile-single-influxdb
new file mode 100644
index 0000000000..b59bf93585
--- /dev/null
+++ b/docker/src/main/Dockerfile-single-influxdb
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# docker build context is the root path of the repository
+
+FROM openjdk:11-jre-slim
+
+ADD distribution/target/apache-iotdb-*-server-bin.zip /
+
+RUN apt update \
+  && apt install lsof dos2unix procps unzip -y \
+  && unzip /apache-iotdb-*-bin.zip -d / \
+  && rm /apache-iotdb-*-bin.zip \
+  && mv /apache-iotdb-* /iotdb \
+  && sed -i '/^# enable_influxdb_rpc_service=false/a enable_influxdb_rpc_service=true' /iotdb/sbin/../conf/iotdb-engine.properties \
+  && apt remove unzip -y \
+  && apt autoremove -y \
+  && apt purge --auto-remove -y \
+  && apt clean -y
+RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
+EXPOSE 6667
+EXPOSE 31999
+EXPOSE 5555
+EXPOSE 8086
+EXPOSE 8181
+VOLUME /iotdb/data
+VOLUME /iotdb/logs
+ENV PATH="/iotdb/sbin/:/iotdb/tools/:${PATH}"
+ENTRYPOINT ["/iotdb/sbin/start-server.sh"]
diff --git a/docs/UserGuide/API/InfluxDB-Protocol.md b/docs/UserGuide/API/InfluxDB-Protocol.md
index 8df48181d1..df2a6112bd 100644
--- a/docs/UserGuide/API/InfluxDB-Protocol.md
+++ b/docs/UserGuide/API/InfluxDB-Protocol.md
@@ -29,7 +29,7 @@
     </dependency>
 ```
 
-Here are some examples of connecting IoTDB using the InfluxDB-Protocol adapter : https://github.com/apache/iotdb/tree/master/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example
+Here are some [examples](https://github.com/apache/iotdb/tree/master/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example) of connecting IoTDB using the InfluxDB-Protocol adapter.
 
 ## 1. Switching Scheme
 
@@ -262,6 +262,9 @@ However, this will not affect the correctness of the query, because once the tag
 
 Currently, supports InfluxDB 1.x version, which does not support InfluxDB 2.x version.
 
+The Maven dependency of `influxdb-java` supports 2.21 +, and the lower version is not tested.
+
+
 ### 3.2 Function Interface Support
 
 Currently, supports interface functions are as follows:
diff --git a/docs/zh/UserGuide/API/InfluxDB-Protocol.md b/docs/zh/UserGuide/API/InfluxDB-Protocol.md
index a5d09dec92..ea8ff1a791 100644
--- a/docs/zh/UserGuide/API/InfluxDB-Protocol.md
+++ b/docs/zh/UserGuide/API/InfluxDB-Protocol.md
@@ -29,7 +29,7 @@
     </dependency>
 ```
 
-这里是一些使用 InfluxDB-Protocol 适配器连接 IoTDB 的示例:https://github.com/apache/iotdb/tree/master/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example
+这里是一些使用 InfluxDB-Protocol 适配器连接 IoTDB 的[示例](https://github.com/apache/iotdb/tree/master/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/example)
 
 
 ## 1.切换方案
@@ -266,6 +266,8 @@ time                address name phone sex socre
 
 目前支持InfluxDB 1.x 版本,暂不支持InfluxDB 2.x 版本。
 
+`influxdb-java`的maven依赖支持2.21+,低版本未进行测试。
+
 ### 3.2 函数接口支持情况
 
 目前支持的接口函数如下:
diff --git a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
index 54d5065d45..d000118e38 100644
--- a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
+++ b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/IoTDBInfluxDB.java
@@ -89,6 +89,7 @@ public class IoTDBInfluxDB implements InfluxDB {
         if (reflectField.getType().getName().equalsIgnoreCase("java.util.concurrent.TimeUnit")
             && reflectField.getName().equalsIgnoreCase("precision")) {
           precision = (TimeUnit) reflectField.get(point);
+          break;
         }
       } catch (IllegalAccessException e) {
         throw new IllegalArgumentException(e.getMessage());
diff --git a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/dto/SessionPoint.java b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/dto/SessionPoint.java
index da2083b563..403e6e4039 100644
--- a/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/dto/SessionPoint.java
+++ b/influxdb-protocol/src/main/java/org/apache/iotdb/influxdb/protocol/dto/SessionPoint.java
@@ -18,14 +18,12 @@
  */
 package org.apache.iotdb.influxdb.protocol.dto;
 
-import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxEndPoint;
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.session.Session;
 
-import org.influxdb.InfluxDBException;
-
 public class SessionPoint {
-  private final String host;
-  private final int rpcPort;
+  private String host;
+  private int rpcPort;
   private final String username;
   private final String password;
 
@@ -37,8 +35,6 @@ public class SessionPoint {
   }
 
   public SessionPoint(Session session) {
-
-    InfluxEndPoint endPoint = null;
     String username = null;
     String password = null;
 
@@ -49,9 +45,10 @@ public class SessionPoint {
         if (reflectField
                 .getType()
                 .getName()
-                .equalsIgnoreCase("org.apache.iotdb.service.rpc.thrift.EndPoint")
+                .equalsIgnoreCase("org.apache.iotdb.common.rpc.thrift.EndPoint")
             && reflectField.getName().equalsIgnoreCase("defaultEndPoint")) {
-          endPoint = (InfluxEndPoint) reflectField.get(session);
+          this.rpcPort = ((EndPoint) reflectField.get(session)).port;
+          this.host = ((EndPoint) reflectField.get(session)).ip;
         }
         if (reflectField.getType().getName().equalsIgnoreCase("java.lang.String")
             && reflectField.getName().equalsIgnoreCase("username")) {
@@ -65,12 +62,7 @@ public class SessionPoint {
         throw new IllegalArgumentException(e.getMessage());
       }
     }
-    if (endPoint == null) {
-      throw new InfluxDBException("session's ip and port is null");
-    }
 
-    this.host = endPoint.ip;
-    this.rpcPort = endPoint.port;
     this.username = username;
     this.password = password;
   }
diff --git a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
index acf7d6ee1f..079878a915 100644
--- a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
+++ b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java
@@ -27,8 +27,8 @@ import org.influxdb.dto.Point;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.utility.DockerImageName;
@@ -41,31 +41,31 @@ import static org.junit.Assert.*;
 
 public class IoTDBInfluxDBIT {
 
-  private String host;
-  private Integer port;
-  private String username;
-  private String password;
-  private InfluxDB influxDB;
+  private static String host;
+  private static Integer port;
+  private static String username;
+  private static String password;
+  private static InfluxDB influxDB;
 
-  @Rule
-  public GenericContainer<?> iotdb =
-      new GenericContainer(DockerImageName.parse("apache/iotdb:maven-development"))
+  @ClassRule
+  public static GenericContainer<?> iotdb =
+      new GenericContainer(DockerImageName.parse("apache/iotdb:influxdb-protocol-on"))
           .withExposedPorts(8086);
 
-  @Before
-  public void setUp() {
+  @BeforeClass
+  public static void setUp() {
     host = iotdb.getContainerIpAddress();
     port = iotdb.getMappedPort(8086);
     username = "root";
     password = "root";
     influxDB = IoTDBInfluxDBFactory.connect(host, port, username, password);
-    influxDB.createDatabase("monitor");
-    influxDB.setDatabase("monitor");
+    influxDB.createDatabase("database");
+    influxDB.setDatabase("database");
 
     insertData();
   }
 
-  private void insertData() {
+  private static void insertData() {
     // insert the build parameter to construct the influxdb
     Point.Builder builder = Point.measurement("student");
     Map<String, String> tags = new HashMap<>();
@@ -178,7 +178,7 @@ public class IoTDBInfluxDBIT {
     QueryResult result = influxDB.query(query);
     QueryResult.Series series = result.getResults().get(0).getSeries().get(0);
 
-    Object[] retArray = new Object[] {0, 99.0, 87.0, 186, 2, 12.0, 93, 87, 99};
+    Object[] retArray = new Object[] {0.0, 99.0, 87.0, 186.0, 2.0, 12.0, 93.0, 87.0, 99.0};
     for (int i = 0; i < series.getColumns().size(); i++) {
       assertEquals(retArray[i], series.getValues().get(0).get(i));
     }
@@ -194,7 +194,7 @@ public class IoTDBInfluxDBIT {
     QueryResult.Series series = result.getResults().get(0).getSeries().get(0);
 
     Object[] retArray =
-        new Object[] {0, 2, 87, "china", 99.0, 93.0, 93.0, 87.0, 87, 12.0, 6.0, 186.0};
+        new Object[] {0.0, 2.0, 87.0, "china", 99.0, 93.0, 93.0, 87.0, 87.0, 12.0, 6.0, 186.0};
     for (int i = 0; i < series.getColumns().size(); i++) {
       assertEquals(retArray[i], series.getValues().get(0).get(i));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
index 143a4b9167..0ec2fa1f56 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
@@ -285,8 +285,7 @@ public class QueryHandler {
           serviceProvider.createQueryDataSet(
               queryContext, physicalPlan, InfluxConstant.DEFAULT_FETCH_SIZE);
       int fieldNums = 0;
-      Map<String, Integer> tagOrders =
-          InfluxDBMetaManager.getDatabase2Measurement2TagOrders().get(database).get(measurement);
+      Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
       int tagOrderNums = tagOrders.size();
       while (queryDataSet.hasNext()) {
         List<Field> fields = queryDataSet.next().getFields();
@@ -820,8 +819,7 @@ public class QueryHandler {
     List<SingleSeriesExpression> fieldExpressions = new ArrayList<>();
     // maximum number of tags in the current query criteria
     int currentQueryMaxTagNum = 0;
-    Map<String, Integer> tagOrders =
-        InfluxDBMetaManager.getDatabase2Measurement2TagOrders().get(database).get(measurement);
+    Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
     for (IExpression expression : expressions) {
       SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression);
       // the current condition is in tag
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
index 261a0e936c..4c2ef1918c 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java
@@ -204,8 +204,17 @@ public class InfluxDBMetaManager {
     }
   }
 
-  public static Map<String, Map<String, Map<String, Integer>>> getDatabase2Measurement2TagOrders() {
-    return database2Measurement2TagOrders;
+  public static Map<String, Integer> getTagOrders(String database, String measurement) {
+    Map<String, Integer> tagOrders = new HashMap<>();
+    Map<String, Map<String, Integer>> measurement2TagOrders =
+        database2Measurement2TagOrders.get(database);
+    if (measurement2TagOrders != null) {
+      tagOrders = measurement2TagOrders.get(measurement);
+    }
+    if (tagOrders == null) {
+      tagOrders = new HashMap<>();
+    }
+    return tagOrders;
   }
 
   private static class InfluxDBMetaManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
index df4c431fa2..686dd20b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
@@ -76,8 +76,7 @@ public class QueryResultUtils {
     QueryResult.Series series = new QueryResult.Series();
     series.setName(measurement);
     // gets the reverse map of the tag
-    Map<String, Integer> tagOrders =
-        InfluxDBMetaManager.getDatabase2Measurement2TagOrders().get(database).get(measurement);
+    Map<String, Integer> tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement);
     Map<Integer, String> tagOrderReversed =
         tagOrders.entrySet().stream()
             .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));