You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/01/19 02:27:30 UTC

[iotdb] branch add_python_interface updated: add test and fix bug

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch add_python_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/add_python_interface by this push:
     new d754463  add test and fix bug
d754463 is described below

commit d75446399791239b42971f5ad61056c4664ac752
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Jan 19 10:26:58 2021 +0800

    add test and fix bug
---
 client-py/src/SessionTest.py                       | 159 +++++++++++++++++++++
 client-py/src/iotdb/Session.py                     |  54 ++++++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  11 +-
 3 files changed, 220 insertions(+), 4 deletions(-)

diff --git a/client-py/src/SessionTest.py b/client-py/src/SessionTest.py
new file mode 100644
index 0000000..dc34c67
--- /dev/null
+++ b/client-py/src/SessionTest.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+from iotdb.Session import Session
+from iotdb.utils.Tablet import Tablet
+from iotdb.utils.IoTDBConstants import *
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+def test_fail(message):
+  global failed_count
+  global final_flag
+  print("*********")
+  print(message)
+  print("*********")
+  final_flag = False
+  failed_count += 1
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id='UTC+8')
+session.open(False)
+
+if not session.is_open():
+  print("can't open session")
+  exit(1)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+
+if session.delete_storage_group("root.sg_test_02") < 0:
+  test_fail("delete storage group failed")
+
+if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
+  test_fail("delete storage groups failed")
+
+# setting time series.
+session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
+
+# setting multiple time series once.
+ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
+                "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
+data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
+                  TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
+
+# delete time series
+if session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]) < 0:
+  test_fail("delete time series failed")
+
+# checking time series
+# s_07 expecting False
+if session.check_time_series_exists("root.sg_test_01.d_01.s_07"):
+  test_fail("root.sg_test_01.d_01.s_07 shouldn't exist")
+
+# s_03 expecting True
+if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"):
+  test_fail("root.sg_test_01.d_01.s_03 should exist")
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
+               TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+if session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) < 0:
+  test_fail("insert record failed")
+
+# insert multiple records into database
+measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+                      ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
+values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
+                [True, 77, 88, 1.25, 8.125, "test_records02"]]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+if session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_) < 0:
+  test_fail("insert records failed")
+
+# insert one tablet into the database.
+values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
+           [True, 100, 11111, 1.25, 101.0, "test02"],
+           [False, 100, 1, 188.1, 688.25, "test03"],
+           [True, 0, 0, 0, 6.25, "test04"]]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
+if session.insert_tablet(tablet_) < 0:
+  test_fail("insert tablet failed")
+
+# insert multiple tablets into database
+tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
+tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
+if session.insert_tablets([tablet_01, tablet_02]) < 0:
+  test_fail("insert tablets failed")
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
+data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+if session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list) < 0:
+  test_fail("insert records of one device failed")
+
+# execute non-query sql statement
+if session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)") < 0:
+  test_fail("execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed")
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
+session_data_set.set_fetch_size(1024)
+expect_count = 16
+actual_count = 0
+while session_data_set.has_next():
+  actual_count += 1
+  print(session_data_set.next())
+session_data_set.close_operation_handle()
+
+if actual_count != expect_count:
+  test_fail("query count mismatch: expect count: "
+            + str(expect_count) + " actual count: " + str(actual_count))
+
+# close session connection.
+session.close()
+
+if final_flag:
+  print("All executions done!!")
+else:
+  print("Some test failed, please have a check")
+  print("failed count: ", failed_count)
+  exit(1)
diff --git a/client-py/src/iotdb/Session.py b/client-py/src/iotdb/Session.py
index 3e1febb..3190aa3 100644
--- a/client-py/src/iotdb/Session.py
+++ b/client-py/src/iotdb/Session.py
@@ -44,6 +44,7 @@ from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZone
 
 
 class Session(object):
+    SUCCESS_CODE = 200
     DEFAULT_FETCH_SIZE = 10000
     DEFAULT_USER = 'root'
     DEFAULT_PASSWORD = 'root'
@@ -109,6 +110,9 @@ class Session(object):
 
         self.__is_close = False
 
+    def is_open(self):
+        return not self.__is_close
+
     def close(self):
         if self.__is_close:
             return
@@ -130,13 +134,15 @@ class Session(object):
         status = self.__client.setStorageGroup(self.__session_id, group_name)
         print("setting storage group {} message: {}".format(group_name, status.message))
 
+        return Session.verify_success(status)
+
     def delete_storage_group(self, storage_group):
         """
         delete one storage group.
         :param storage_group: String, path of the target storage group.
         """
         groups = [storage_group]
-        self.delete_storage_groups(groups)
+        return self.delete_storage_groups(groups)
 
     def delete_storage_groups(self, storage_group_lst):
         """
@@ -146,6 +152,8 @@ class Session(object):
         status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
         print("delete storage group(s) {} message: {}".format(storage_group_lst, status.message))
 
+        return Session.verify_success(status)
+
     def create_time_series(self, ts_path, data_type, encoding, compressor):
         """
         create single time series
@@ -161,6 +169,8 @@ class Session(object):
         status = self.__client.createTimeseries(request)
         print("creating time series {} message: {}".format(ts_path, status.message))
 
+        return Session.verify_success(status)
+
     def create_multi_time_series(self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst):
         """
         create multiple time series
@@ -178,6 +188,8 @@ class Session(object):
         status = self.__client.createMultiTimeseries(request)
         print("creating multiple time series {} message: {}".format(ts_path_lst, status.message))
 
+        return Session.verify_success(status)
+
     def delete_time_series(self, paths_list):
         """
         delete multiple time series, including data and schema
@@ -186,6 +198,8 @@ class Session(object):
         status = self.__client.deleteTimeseries(self.__session_id, paths_list)
         print("deleting multiple time series {} message: {}".format(paths_list, status.message))
 
+        return Session.verify_success(status)
+
     def check_time_series_exists(self, path):
         """
         check whether a specific time series exists
@@ -217,6 +231,8 @@ class Session(object):
         status = self.__client.insertRecord(request)
         print("insert one record to device {} message: {}".format(device_id, status.message))
 
+        return Session.verify_success(status)
+
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
         insert one row of record into database, if you want improve your performance, please use insertTablet method
@@ -234,6 +250,8 @@ class Session(object):
         status = self.__client.insertRecord(request)
         print("insert one record to device {} message: {}".format(device_id, status.message))
 
+        return Session.verify_success(status)
+
     def insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
         """
         insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -252,6 +270,8 @@ class Session(object):
         status = self.__client.insertRecords(request)
         print("insert multiple records to devices {} message: {}".format(device_ids, status.message))
 
+        return Session.verify_success(status)
+
     def test_insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -267,6 +287,8 @@ class Session(object):
         status = self.__client.testInsertRecord(request)
         print("testing! insert one record to device {} message: {}".format(device_id, status.message))
 
+        return Session.verify_success(status)
+
     def test_insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -285,6 +307,8 @@ class Session(object):
         status = self.__client.testInsertRecords(request)
         print("testing! insert multiple records, message: {}".format(status.message))
 
+        return Session.verify_success(status)
+
     def gen_insert_record_req(self, device_id, timestamp, measurements, data_types, values):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             print("length of data types does not equal to length of values!")
@@ -326,6 +350,8 @@ class Session(object):
         status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
         print("insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message))
 
+        return Session.verify_success(status)
+
     def insert_tablets(self, tablet_lst):
         """
         insert multiple tablets, tablets are independent to each other
@@ -334,6 +360,8 @@ class Session(object):
         status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
         print("insert multiple tablets, message: {}".format(status.message))
 
+        return Session.verify_success(status)
+
 
     def insert_records_of_one_device(self, device_id, times_list, measurements_list, types_list, values_list):
         # sort by timestamp
@@ -341,7 +369,7 @@ class Session(object):
         result = zip(*sorted_zipped)
         times_list, measurements_list, types_list, values_list = [list(x) for x in result]
 
-        self.insert_records_of_one_device_sorted(device_id, times_list, measurements_list, types_list, values_list)
+        return self.insert_records_of_one_device_sorted(device_id, times_list, measurements_list, types_list, values_list)
 
     def insert_records_of_one_device_sorted(self, device_id, times_list, measurements_list, types_list, values_list):
         """
@@ -373,6 +401,8 @@ class Session(object):
         status = self.__client.insertRecordsOfOneDevice(request)
         print("insert records of one device, message: {}".format(status.message))
 
+        return Session.verify_success(status)
+
     def gen_insert_records_of_one_device_request(self, device_id, times_list, measurements_list, values_list, types_list):
         binary_value_list = []
         for values, data_types, measurements in zip(values_list, types_list, measurements_list):
@@ -396,6 +426,8 @@ class Session(object):
         status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
         print("testing! insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message))
 
+        return Session.verify_success(status)
+
     def test_insert_tablets(self, tablet_list):
         """
          this method NOT insert data into database and the server just return after accept the request, this method
@@ -405,6 +437,8 @@ class Session(object):
         status = self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list))
         print("testing! insert multiple tablets, message: {}".format(status.message))
 
+        return Session.verify_success(status)
+
     def gen_insert_tablet_req(self, tablet):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
         return TSInsertTabletReq(self.__session_id, tablet.get_device_id(), tablet.get_measurements(),
@@ -450,8 +484,10 @@ class Session(object):
             resp = self.__client.executeUpdateStatement(request)
             status = resp.status
             print("execute non-query statement {} message: {}".format(sql, status.message))
+            return Session.verify_success(status)
         except TTransport.TException as e:
             print("execution of non-query statement fails because: ", e)
+            return -1
 
     @staticmethod
     def value_to_bytes(data_types, values):
@@ -524,4 +560,16 @@ class Session(object):
         for i in range(1, len(timestamps)):
             if timestamps[i] < timestamps[i - 1]:
                 return False
-        return True
\ No newline at end of file
+        return True
+
+    @staticmethod
+    def verify_success(status):
+        """
+        verify success of operation
+        :param status: execution result status
+        """
+        if status.code == Session.SUCCESS_CODE:
+            return 0
+
+        print("error status is", status)
+        return -1
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 261d0f5..9d5a34a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1139,7 +1139,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           TSStatusCode.INTERNAL_SERVER_ERROR));
     }
 
-    return RpcUtils.getStatus(statusList);
+    TSStatus resp = RpcUtils.getStatus(statusList);
+    for(TSStatus status : resp.subStatus){
+      if(status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()){
+        return resp;
+      }
+    }
+
+    resp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+    return resp;
   }
 
   @Override