You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/15 13:24:43 UTC
[iotdb] branch master updated: [To master] Set line break to Unix
style in spotless (#3742)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 685dbfb [To master] Set line break to Unix style in spotless (#3742)
685dbfb is described below
commit 685dbfb5f2907588d851204e1c6a8cbd6424b86d
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Sun Aug 15 21:24:13 2021 +0800
[To master] Set line break to Unix style in spotless (#3742)
---
.gitattributes | 1 +
client-cpp/src/main/Session.cpp | 2190 +++++++++++++++++++--------------------
client-cpp/src/main/Session.h | 1474 +++++++++++++-------------
pom.xml | 1 +
4 files changed, 1834 insertions(+), 1832 deletions(-)
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..07764a7
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+* text eol=lf
\ No newline at end of file
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 6d639b7..f440af3 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -1,1095 +1,1095 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "Session.h"
-
-using namespace std;
-
-TSDataType::TSDataType getTSDataTypeFromString(string str) {
- // BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
- if (str == "BOOLEAN") return TSDataType::BOOLEAN;
- else if (str == "INT32") return TSDataType::INT32;
- else if (str == "INT64") return TSDataType::INT64;
- else if (str == "FLOAT") return TSDataType::FLOAT;
- else if (str == "DOUBLE") return TSDataType::DOUBLE;
- else if (str == "TEXT") return TSDataType::TEXT;
- else if (str == "NULLTYPE") return TSDataType::NULLTYPE;
- return TSDataType::TEXT;
-}
-
-void RpcUtils::verifySuccess(TSStatus &status) {
- if (status.code == TSStatusCode::MULTIPLE_ERROR) {
- verifySuccess(status.subStatus);
- return;
- }
- if (status.code != TSStatusCode::SUCCESS_STATUS) {
- throw IoTDBConnectionException(to_string(status.code) + ": " + status.message.c_str());
- }
-}
-
-void RpcUtils::verifySuccess(vector <TSStatus> &statuses) {
- for (TSStatus status : statuses) {
- if (status.code != TSStatusCode::SUCCESS_STATUS) {
- throw BatchExecutionException(statuses, status.message);
- }
- }
-}
-
-TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode tsStatusCode) {
- TSStatus tmpTSStatus = TSStatus();
- tmpTSStatus.__set_code(tsStatusCode);
- return tmpTSStatus;
-}
-
-TSStatus RpcUtils::getStatus(int code, string message) {
- TSStatus status = TSStatus();
- status.__set_code(code);
- status.__set_message(message);
- return status;
-}
-
-shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
- TSStatus status = getStatus(tsStatusCode);
- return getTSExecuteStatementResp(status);
-}
-
-shared_ptr <TSExecuteStatementResp>
-RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, string message) {
- TSStatus status = getStatus(tsStatusCode, message);
- return getTSExecuteStatementResp(status);
-}
-
-shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatus &status) {
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
- TSStatus tsStatus(status);
- resp->status = status;
- return resp;
-}
-
-shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
- TSStatus status = getStatus(tsStatusCode);
- return getTSFetchResultsResp(status);
-}
-
-shared_ptr <TSFetchResultsResp>
-RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, string appendMessage) {
- TSStatus status = getStatus(tsStatusCode, appendMessage);
- return getTSFetchResultsResp(status);
-}
-
-shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatus &status) {
- shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
- TSStatus tsStatus(status);
- resp->__set_status(tsStatus);
- return resp;
-}
-
-void Tablet::reset() {
- rowSize = 0;
-}
-
-void Tablet::createColumns() {
- // create timestamp column
- timestamps.resize(maxRowNumber);
- // create value columns
- values.resize(schemas.size());
- for (int i = 0; i < schemas.size(); i++) {
- values[i].resize(maxRowNumber);
- }
-}
-
-int Tablet::getTimeBytesSize() {
- return rowSize * 8;
-}
-
-int Tablet::getValueByteSize() {
- int valueOccupation = 0;
- for (int i = 0; i < schemas.size(); i++) {
- switch (schemas[i].second) {
- case TSDataType::BOOLEAN:
- valueOccupation += rowSize;
- break;
- case TSDataType::INT32:
- valueOccupation += rowSize * 4;
- break;
- case TSDataType::INT64:
- valueOccupation += rowSize * 8;
- break;
- case TSDataType::FLOAT:
- valueOccupation += rowSize * 4;
- break;
- case TSDataType::DOUBLE:
- valueOccupation += rowSize * 8;
- break;
- case TSDataType::TEXT:
- valueOccupation += rowSize * 4;
- for (string value : values[i]) {
- valueOccupation += value.size();
- }
- break;
- default:
- throw UnSupportedDataTypeException(
- string("Data type ") + to_string(schemas[i].second) + " is not supported.");
- }
- }
- return valueOccupation;
-}
-
-string SessionUtils::getTime(Tablet &tablet) {
- MyStringBuffer timeBuffer;
- for (int i = 0; i < tablet.rowSize; i++) {
- timeBuffer.putLong(tablet.timestamps[i]);
- }
- return timeBuffer.str;
-}
-
-string SessionUtils::getValue(Tablet &tablet) {
- MyStringBuffer valueBuffer;
- for (int i = 0; i < tablet.schemas.size(); i++) {
- TSDataType::TSDataType dataType = tablet.schemas[i].second;
- switch (dataType) {
- case TSDataType::BOOLEAN:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putBool(tablet.values[i][index] == "true");
- }
- break;
- case TSDataType::INT32:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putInt(stoi(tablet.values[i][index]));
- }
- break;
- case TSDataType::INT64:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putLong(stol(tablet.values[i][index]));
- }
- break;
- case TSDataType::FLOAT:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putFloat(stof(tablet.values[i][index]));
- }
- break;
- case TSDataType::DOUBLE:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putDouble(stod(tablet.values[i][index]));
- }
- break;
- case TSDataType::TEXT:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putString(tablet.values[i][index]);
- }
- break;
- default:
- throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
- }
- }
- return valueBuffer.str;
-}
-
-int SessionDataSet::getBatchSize() {
- return batchSize;
-}
-
-void SessionDataSet::setBatchSize(int batchSize) {
- this->batchSize = batchSize;
-}
-
-vector <string> SessionDataSet::getColumnNames() { return this->columnNameList; }
-
-bool SessionDataSet::hasNext() {
- if (hasCachedRecord) {
- return true;
- }
- if (!tsQueryDataSetTimeBuffer.hasRemaining()) {
- shared_ptr <TSFetchResultsReq> req(new TSFetchResultsReq());
- req->__set_sessionId(sessionId);
- req->__set_statement(sql);
- req->__set_fetchSize(batchSize);
- req->__set_queryId(queryId);
- req->__set_isAlign(true);
- try {
- shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
- client->fetchResults(*resp, *req);
- RpcUtils::verifySuccess(resp->status);
-
- if (!resp->hasResultSet) {
- return false;
- } else {
- tsQueryDataSet = make_shared<TSQueryDataSet>(resp->queryDataSet);
- tsQueryDataSetTimeBuffer = tsQueryDataSet->time;
- rowsIndex = 0;
- }
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(
- string("Cannot fetch result from server, because of network connection: ") + e.what());
- }
- }
-
- constructOneRow();
- hasCachedRecord = true;
- return true;
-}
-
-void SessionDataSet::constructOneRow() {
- vector <Field> outFields;
- int loc = 0;
- for (int i = 0; i < columnSize; i++) {
- Field field;
- if (duplicateLocation.find(i) != duplicateLocation.end()) {
- field = outFields[duplicateLocation[i]];
- } else {
- MyStringBuffer *bitmapBuffer = bitmapBuffers[loc].get();
- // another new 8 row, should move the bitmap buffer position to next byte
- if (rowsIndex % 8 == 0) {
- currentBitmap[loc] = bitmapBuffer->getChar();
- }
-
- if (!isNull(loc, rowsIndex)) {
- MyStringBuffer *valueBuffer = valueBuffers[loc].get();
- TSDataType::TSDataType dataType = getTSDataTypeFromString(columnTypeDeduplicatedList[loc]);
- field.dataType = dataType;
- switch (dataType) {
- case TSDataType::BOOLEAN: {
- bool booleanValue = valueBuffer->getBool();
- field.boolV = booleanValue;
- break;
- }
- case TSDataType::INT32: {
- int intValue = valueBuffer->getInt();
- field.intV = intValue;
- break;
- }
- case TSDataType::INT64: {
- int64_t longValue = valueBuffer->getLong();
- field.longV = longValue;
- break;
- }
- case TSDataType::FLOAT: {
- float floatValue = valueBuffer->getFloat();
- field.floatV = floatValue;
- break;
- }
- case TSDataType::DOUBLE: {
- double doubleValue = valueBuffer->getDouble();
- field.doubleV = doubleValue;
- break;
- }
- case TSDataType::TEXT: {
- string stringValue = valueBuffer->getString();
- field.stringV = stringValue;
- break;
- }
- default: {
- throw UnSupportedDataTypeException(
- string("Data type ") + columnTypeDeduplicatedList[i].c_str() + " is not supported.");
- }
- }
- } else {
- field.dataType = TSDataType::NULLTYPE;
- }
- loc++;
- }
- outFields.push_back(field);
- }
-
- rowRecord = RowRecord(tsQueryDataSetTimeBuffer.getLong(), outFields);
- rowsIndex++;
-}
-
-bool SessionDataSet::isNull(int index, int rowNum) {
- char bitmap = currentBitmap[index];
- int shift = rowNum % 8;
- return ((flag >> shift) & bitmap) == 0;
-}
-
-RowRecord *SessionDataSet::next() {
- if (!hasCachedRecord) {
- if (!hasNext()) {
- return NULL;
- }
- }
-
- hasCachedRecord = false;
- return &rowRecord;
-}
-
-void SessionDataSet::closeOperationHandle() {
- shared_ptr <TSCloseOperationReq> closeReq(new TSCloseOperationReq());
- closeReq->__set_sessionId(sessionId);
- closeReq->__set_statementId(statementId);
- closeReq->__set_queryId(queryId);
- shared_ptr <TSStatus> closeResp(new TSStatus());
- try {
- client->closeOperation(*closeResp, *closeReq);
- RpcUtils::verifySuccess(*closeResp);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(
- string("Error occurs when connecting to server for close operation, because: ") + e.what());
- }
-}
-
-
-/**
- * check whether the batch has been sorted
- *
- * @return whether the batch has been sorted
- */
-bool Session::checkSorted(Tablet &tablet) {
- for (int i = 1; i < tablet.rowSize; i++) {
- if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
- return false;
- }
- }
- return true;
-}
-
-bool Session::checkSorted(vector <int64_t> ×) {
- for (int i = 1; i < times.size(); i++) {
- if (times[i] < times[i - 1]) {
- return false;
- }
- }
- return true;
-}
-
-void Session::sortTablet(Tablet &tablet) {
- /*
- * following part of code sort the batch data by time,
- * so we can insert continuous data in value list to get a better performance
- */
- // sort to get index, and use index to sort value list
- int *index = new int[tablet.rowSize];
- for (int i = 0; i < tablet.rowSize; i++) {
- index[i] = i;
- }
-
- this->sortIndexByTimestamp(index, tablet.timestamps, tablet.rowSize);
- sort(tablet.timestamps.begin(), tablet.timestamps.begin() + tablet.rowSize);
- for (int i = 0; i < tablet.schemas.size(); i++) {
- tablet.values[i] = sortList(tablet.values[i], index, tablet.rowSize);
- }
-
- delete[] index;
-}
-
-void Session::sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps, int length) {
- // Use Insert Sort Algorithm
- if (length >= 2) {
- for (int i = 1; i < length; i++) {
- int x = timestamps[i];
- int tmpIndex = index[i];
- int j = i - 1;
-
- while (j >= 0 && timestamps[j] > x) {
- timestamps[j + 1] = timestamps[j];
- index[j + 1] = index[j];
- j--;
- }
-
- timestamps[j + 1] = x;
- index[j + 1] = tmpIndex;
- }
- }
-}
-
-/**
- * Append value into buffer in Big Endian order to comply with IoTDB server
- */
-void Session::appendValues(string &buffer, char *value, int size) {
- for (int i = size - 1; i >= 0; i--) {
- buffer.append(value + i, 1);
- }
-}
-
-void Session::putValuesIntoBuffer(vector <TSDataType::TSDataType> &types, vector<char *> &values, string &buf) {
- for (int i = 0; i < values.size(); i++) {
- int8_t typeNum = getDataTypeNumber(types[i]);
- buf.append((char *) (&typeNum), sizeof(int8_t));
- switch (types[i]) {
- case TSDataType::BOOLEAN:
- buf.append(values[i], 1);
- break;
- case TSDataType::INT32:
- appendValues(buf, values[i], sizeof(int32_t));
- break;
- case TSDataType::INT64:
- appendValues(buf, values[i], sizeof(int64_t));
- break;
- case TSDataType::FLOAT:
- appendValues(buf, values[i], sizeof(float));
- break;
- case TSDataType::DOUBLE:
- appendValues(buf, values[i], sizeof(double));
- break;
- case TSDataType::TEXT:
- string str(values[i]);
- int len = str.length();
- appendValues(buf, (char *) (&len), sizeof(int));
- // no need to change the byte order of string value
- buf.append(values[i], len);
- break;
- }
- }
-}
-
-int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
- switch (type) {
- case TSDataType::BOOLEAN:
- return 0;
- case TSDataType::INT32:
- return 1;
- case TSDataType::INT64:
- return 2;
- case TSDataType::FLOAT:
- return 3;
- case TSDataType::DOUBLE:
- return 4;
- case TSDataType::TEXT:
- return 5;
- default:
- return -1;
- }
-}
-
-void Session::open() {
- try {
- open(false, DEFAULT_TIMEOUT_MS);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::open(bool enableRPCCompression) {
- try {
- open(enableRPCCompression, DEFAULT_TIMEOUT_MS);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
- if (!isClosed) {
- return;
- }
- shared_ptr <TSocket> socket(new TSocket(host, rpcPort));
- shared_ptr <TTransport> transport(new TFramedTransport(socket));
- socket->setConnTimeout(connectionTimeoutInMs);
- if (!transport->isOpen()) {
- try {
- transport->open();
- }
- catch (TTransportException e) {
- throw IoTDBConnectionException(e.what());
- }
- }
- if (enableRPCCompression) {
- shared_ptr <TCompactProtocol> protocol(new TCompactProtocol(transport));
- shared_ptr <TSIServiceIf> client_instance(new TSIServiceClient(protocol));
- client = client_instance;
- } else {
- shared_ptr <TBinaryProtocol> protocol(new TBinaryProtocol(transport));
- shared_ptr <TSIServiceIf> client_instance(new TSIServiceClient(protocol));
- client = client_instance;
- }
- shared_ptr <TSOpenSessionReq> openReq(new TSOpenSessionReq());
- openReq->__set_username(username);
- openReq->__set_password(password);
- openReq->__set_zoneId(zoneId);
- try {
- shared_ptr <TSOpenSessionResp> openResp(new TSOpenSessionResp());
- client->openSession(*openResp, *openReq);
- RpcUtils::verifySuccess(openResp->status);
- if (protocolVersion != openResp->serverProtocolVersion) {
- if (openResp->serverProtocolVersion == 0) {// less than 0.10
- throw logic_error(string("Protocol not supported, Client version is ") + to_string(protocolVersion) +
- ", but Server version is " + to_string(openResp->serverProtocolVersion));
- }
- }
-
- sessionId = openResp->sessionId;
- statementId = client->requestStatementId(sessionId);
-
- if (zoneId != "") {
- setTimeZone(zoneId);
- } else {
- zoneId = getTimeZone();
- }
- }
- catch (exception e) {
- transport->close();
- throw IoTDBConnectionException(e.what());
- }
- isClosed = false;
-}
-
-
-void Session::close() {
- if (isClosed) {
- return;
- }
- shared_ptr <TSCloseSessionReq> req(new TSCloseSessionReq());
- req->__set_sessionId(sessionId);
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->closeSession(*resp, *req);
- }
- catch (exception e) {
- throw IoTDBConnectionException(
- string("Error occurs when closing session at server. Maybe server is down. ") + e.what());
- }
- isClosed = true;
- if (transport != NULL) {
- transport->close();
- }
-}
-
-
-void Session::insertRecord(string deviceId, int64_t time, vector <string> &measurements, vector <string> &values) {
- shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
- req->__set_sessionId(sessionId);
- req->__set_deviceId(deviceId);
- req->__set_timestamp(time);
- req->__set_measurements(measurements);
- req->__set_values(values);
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->insertStringRecord(*resp, *req);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::insertRecord(string prefixPath, int64_t time, vector <string> &measurements,
- vector <TSDataType::TSDataType> &types, vector<char *> &values) {
- shared_ptr <TSInsertRecordReq> req(new TSInsertRecordReq());
- req->__set_sessionId(sessionId);
- req->__set_prefixPath(prefixPath);
- req->__set_timestamp(time);
- req->__set_measurements(measurements);
- string buffer;
- putValuesIntoBuffer(types, values, buffer);
- req->__set_values(buffer);
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->insertRecord(*resp, *req);
- RpcUtils::verifySuccess(*resp);
- } catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void
-Session::insertRecords(vector <string> &deviceIds, vector <int64_t> ×, vector <vector<string>> &measurementsList,
- vector <vector<string>> &valuesList) {
- int len = deviceIds.size();
- if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
- logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
- throw exception(e);
- }
- shared_ptr <TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
- request->__set_sessionId(sessionId);
- request->__set_deviceIds(deviceIds);
- request->__set_timestamps(times);
- request->__set_measurementsList(measurementsList);
- request->__set_valuesList(valuesList);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->insertStringRecords(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::insertRecords(vector <string> &deviceIds, vector <int64_t> ×,
- vector <vector<string>> &measurementsList,
- vector <vector<TSDataType::TSDataType>> typesList,
- vector <vector<char *>> &valuesList) {
- int len = deviceIds.size();
- if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
- logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
- throw exception(e);
- }
- shared_ptr <TSInsertRecordsReq> request(new TSInsertRecordsReq());
- request->__set_sessionId(sessionId);
- request->__set_deviceIds(deviceIds);
- request->__set_timestamps(times);
- request->__set_measurementsList(measurementsList);
- vector <string> bufferList;
- for (int i = 0; i < valuesList.size(); i++) {
- string buffer;
- putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
- bufferList.push_back(buffer);
- }
- request->__set_valuesList(bufferList);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->insertRecords(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- } catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t> ×,
- vector <vector<string>> measurementsList,
- vector <vector<TSDataType::TSDataType>> typesList,
- vector <vector<char *>> &valuesList) {
- insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
-}
-
-void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t> ×,
- vector <vector<string>> measurementsList,
- vector <vector<TSDataType::TSDataType>> typesList,
- vector <vector<char *>> &valuesList, bool sorted) {
-
- if (sorted) {
- if (!checkSorted(times)) {
- throw BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
- }
- } else {
- int *index = new int[times.size()];
- for (int i = 0; i < times.size(); i++) {
- index[i] = i;
- }
-
- this->sortIndexByTimestamp(index, times, times.size());
- sort(times.begin(), times.end());
- measurementsList = sortList(measurementsList, index, times.size());
- typesList = sortList(typesList, index, times.size());
- valuesList = sortList(valuesList, index, times.size());
- delete[] index;
- }
- unique_ptr <TSInsertRecordsOfOneDeviceReq> request(new TSInsertRecordsOfOneDeviceReq());
- request->__set_sessionId(sessionId);
- request->__set_deviceId(deviceId);
- request->__set_timestamps(times);
- request->__set_measurementsList(measurementsList);
- vector <string> bufferList;
- for (int i = 0; i < valuesList.size(); i++) {
- string buffer;
- putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
- bufferList.push_back(buffer);
- }
- request->__set_valuesList(bufferList);
-
- try {
- unique_ptr <TSStatus> resp(new TSStatus());
- client->insertRecordsOfOneDevice(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- } catch (const exception &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::insertTablet(Tablet &tablet) {
- try {
- insertTablet(tablet, false);
- }
- catch (const exception &e) {
- logic_error error(e.what());
- throw exception(error);
- }
-}
-
-void Session::insertTablet(Tablet &tablet, bool sorted) {
- if (sorted) {
- if (!checkSorted(tablet)) {
- throw BatchExecutionException("Times in Tablet are not in ascending order");
- }
- } else {
- sortTablet(tablet);
- }
-
- shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
- request->__set_sessionId(sessionId);
- request->prefixPath = tablet.deviceId;
- for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
- request->measurements.push_back(schema.first);
- request->types.push_back(schema.second);
- }
- request->__set_timestamps(SessionUtils::getTime(tablet));
- request->__set_values(SessionUtils::getValue(tablet));
- request->__set_size(tablet.rowSize);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->insertTablet(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw new IoTDBConnectionException(e.what());
- }
-}
-
-void Session::insertTablets(map<string, Tablet *> &tablets) {
- try {
- insertTablets(tablets, false);
- }
- catch (const exception &e) {
- logic_error error(e.what());
- throw exception(error);
- }
-}
-
-void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
- shared_ptr <TSInsertTabletsReq> request(new TSInsertTabletsReq());
- request->__set_sessionId(sessionId);
-
- for (auto &item : tablets) {
- if (sorted) {
- if (!checkSorted(*(item.second))) {
- throw BatchExecutionException("Times in Tablet are not in ascending order");
- }
- } else {
- sortTablet(*(tablets[item.first]));
- }
-
- request->deviceIds.push_back(item.second->deviceId);
- vector <string> measurements;
- vector<int> dataTypes;
- for (pair <string, TSDataType::TSDataType> schema : item.second->schemas) {
- measurements.push_back(schema.first);
- dataTypes.push_back(schema.second);
- }
- request->measurementsList.push_back(measurements);
- request->typesList.push_back(dataTypes);
- request->timestampsList.push_back(SessionUtils::getTime(*(item.second)));
- request->valuesList.push_back(SessionUtils::getValue(*(item.second)));
- request->sizeList.push_back(item.second->rowSize);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->insertTablets(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (const exception &e) {
- throw IoTDBConnectionException(e.what());
- }
- }
-}
-
-void Session::testInsertRecord(string deviceId, int64_t time, vector <string> &measurements, vector <string> &values) {
- shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
- req->__set_sessionId(sessionId);
- req->__set_deviceId(deviceId);
- req->__set_timestamp(time);
- req->__set_measurements(measurements);
- req->__set_values(values);
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->insertStringRecord(*resp, *req);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::testInsertTablet(Tablet &tablet) {
- shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
- request->__set_sessionId(sessionId);
- request->prefixPath = tablet.deviceId;
- for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
- request->measurements.push_back(schema.first);
- request->types.push_back(schema.second);
- }
- request->__set_timestamps(SessionUtils::getTime(tablet));
- request->__set_values(SessionUtils::getValue(tablet));
- request->__set_size(tablet.rowSize);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->testInsertTablet(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw new IoTDBConnectionException(e.what());
- }
-}
-
-void Session::testInsertRecords(vector <string> &deviceIds, vector <int64_t> ×,
- vector <vector<string>> &measurementsList, vector <vector<string>> &valuesList) {
- int len = deviceIds.size();
- if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
- logic_error error("deviceIds, times, measurementsList and valuesList's size should be equal");
- throw exception(error);
- }
- shared_ptr <TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
- request->__set_sessionId(sessionId);
- request->__set_deviceIds(deviceIds);
- request->__set_timestamps(times);
- request->__set_measurementsList(measurementsList);
- request->__set_valuesList(valuesList);
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->insertStringRecords(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::deleteTimeseries(string path) {
- vector <string> paths;
- paths.push_back(path);
- deleteTimeseries(paths);
-}
-
-void Session::deleteTimeseries(vector <string> &paths) {
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->deleteTimeseries(*resp, sessionId, paths);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::deleteData(string path, int64_t time) {
- vector <string> paths;
- paths.push_back(path);
- deleteData(paths, time);
-}
-
-void Session::deleteData(vector <string> &deviceId, int64_t time) {
- shared_ptr <TSDeleteDataReq> req(new TSDeleteDataReq());
- req->__set_sessionId(sessionId);
- req->__set_paths(deviceId);
- req->__set_endTime(time);
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->deleteData(*resp, *req);
- RpcUtils::verifySuccess(*resp);
- }
- catch (exception &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::setStorageGroup(string storageGroupId) {
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->setStorageGroup(*resp, sessionId, storageGroupId);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::deleteStorageGroup(string storageGroup) {
- vector <string> storageGroups;
- storageGroups.push_back(storageGroup);
- deleteStorageGroups(storageGroups);
-}
-
-void Session::deleteStorageGroups(vector <string> &storageGroups) {
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->deleteStorageGroups(*resp, sessionId, storageGroups);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::createTimeseries(string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
- CompressionType::CompressionType compressor) {
- try {
- createTimeseries(path, dataType, encoding, compressor, NULL, NULL, NULL, "");
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::createTimeseries(string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
- CompressionType::CompressionType compressor,
- map <string, string> *props, map <string, string> *tags,
- map <string, string> *attributes, string measurementAlias) {
- shared_ptr <TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
- req->__set_sessionId(sessionId);
- req->__set_path(path);
- req->__set_dataType(dataType);
- req->__set_encoding(encoding);
- req->__set_compressor(compressor);
- if (props != NULL) {
- req->__set_props(*props);
- }
-
- if (tags != NULL) {
- req->__set_tags(*tags);
- }
- if (attributes != NULL) {
- req->__set_attributes(*attributes);
- }
- if (measurementAlias != "") {
- req->__set_measurementAlias(measurementAlias);
- }
-
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->createTimeseries(*resp, *req);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-void Session::createMultiTimeseries(vector <string> paths, vector <TSDataType::TSDataType> dataTypes,
- vector <TSEncoding::TSEncoding> encodings,
- vector <CompressionType::CompressionType> compressors,
- vector <map<string, string>> *propsList, vector <map<string, string>> *tagsList,
- vector <map<string, string>> *attributesList,
- vector <string> *measurementAliasList) {
- shared_ptr <TSCreateMultiTimeseriesReq> request(new TSCreateMultiTimeseriesReq());
- request->__set_sessionId(sessionId);
- request->__set_paths(paths);
-
- vector<int> dataTypesOrdinal;
- for (TSDataType::TSDataType dataType : dataTypes) {
- dataTypesOrdinal.push_back(dataType);
- }
- request->__set_dataTypes(dataTypesOrdinal);
-
- vector<int> encodingsOrdinal;
- for (TSEncoding::TSEncoding encoding : encodings) {
- encodingsOrdinal.push_back(encoding);
- }
- request->__set_encodings(encodingsOrdinal);
-
- vector<int> compressorsOrdinal;
- for (CompressionType::CompressionType compressor: compressors) {
- compressorsOrdinal.push_back(compressor);
- }
- request->__set_compressors(compressorsOrdinal);
-
- if (propsList != NULL) {
- request->__set_propsList(*propsList);
- }
-
- if (tagsList != NULL) {
- request->__set_tagsList(*tagsList);
- }
- if (attributesList != NULL) {
- request->__set_attributesList(*attributesList);
- }
- if (measurementAliasList != NULL) {
- request->__set_measurementAliasList(*measurementAliasList);
- }
-
- try {
- shared_ptr <TSStatus> resp(new TSStatus());
- client->createMultiTimeseries(*resp, *request);
- RpcUtils::verifySuccess(*resp);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-bool Session::checkTimeseriesExists(string path) {
- try {
- std::unique_ptr <SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
- bool isExisted = dataset->hasNext();
- dataset->closeOperationHandle();
- return isExisted;
- }
- catch (exception e) {
- throw IoTDBConnectionException(e.what());
- }
-}
-
-string Session::getTimeZone() {
- if (zoneId != "") {
- return zoneId;
- }
- shared_ptr <TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
- try {
- client->getTimeZone(*resp, sessionId);
- RpcUtils::verifySuccess(resp->status);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
- return resp->timeZone;
-}
-
-void Session::setTimeZone(string zoneId) {
- shared_ptr <TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
- req->__set_sessionId(sessionId);
- req->__set_timeZone(zoneId);
- shared_ptr <TSStatus> resp(new TSStatus());
- try {
- client->setTimeZone(*resp, *req);
- }
- catch (IoTDBConnectionException &e) {
- throw IoTDBConnectionException(e.what());
- }
- RpcUtils::verifySuccess(*resp);
- this->zoneId = zoneId;
-}
-
-unique_ptr <SessionDataSet> Session::executeQueryStatement(string sql) {
- shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
- req->__set_sessionId(sessionId);
- req->__set_statementId(statementId);
- req->__set_statement(sql);
- req->__set_fetchSize(fetchSize);
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
- try {
- client->executeStatement(*resp, *req);
- RpcUtils::verifySuccess(resp->status);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(e.what());
- }
- shared_ptr <TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp->queryDataSet));
- return unique_ptr<SessionDataSet>(new SessionDataSet(
- sql, resp->columns, resp->dataTypeList, resp->queryId, statementId, client, sessionId, queryDataSet));
-}
-
-void Session::executeNonQueryStatement(string sql) {
- shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
- req->__set_sessionId(sessionId);
- req->__set_statementId(statementId);
- req->__set_statement(sql);
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
- try {
- client->executeUpdateStatement(*resp, *req);
- RpcUtils::verifySuccess(resp->status);
- }
- catch (IoTDBConnectionException e) {
- throw IoTDBConnectionException(e.what());
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Session.h"
+
+using namespace std;
+
+TSDataType::TSDataType getTSDataTypeFromString(string str) {
+ // BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
+ if (str == "BOOLEAN") return TSDataType::BOOLEAN;
+ else if (str == "INT32") return TSDataType::INT32;
+ else if (str == "INT64") return TSDataType::INT64;
+ else if (str == "FLOAT") return TSDataType::FLOAT;
+ else if (str == "DOUBLE") return TSDataType::DOUBLE;
+ else if (str == "TEXT") return TSDataType::TEXT;
+ else if (str == "NULLTYPE") return TSDataType::NULLTYPE;
+ return TSDataType::TEXT;
+}
+
+void RpcUtils::verifySuccess(TSStatus &status) {
+ if (status.code == TSStatusCode::MULTIPLE_ERROR) {
+ verifySuccess(status.subStatus);
+ return;
+ }
+ if (status.code != TSStatusCode::SUCCESS_STATUS) {
+ throw IoTDBConnectionException(to_string(status.code) + ": " + status.message.c_str());
+ }
+}
+
+void RpcUtils::verifySuccess(vector <TSStatus> &statuses) {
+ for (TSStatus status : statuses) {
+ if (status.code != TSStatusCode::SUCCESS_STATUS) {
+ throw BatchExecutionException(statuses, status.message);
+ }
+ }
+}
+
+TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode tsStatusCode) {
+ TSStatus tmpTSStatus = TSStatus();
+ tmpTSStatus.__set_code(tsStatusCode);
+ return tmpTSStatus;
+}
+
+TSStatus RpcUtils::getStatus(int code, string message) {
+ TSStatus status = TSStatus();
+ status.__set_code(code);
+ status.__set_message(message);
+ return status;
+}
+
+shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
+ TSStatus status = getStatus(tsStatusCode);
+ return getTSExecuteStatementResp(status);
+}
+
+shared_ptr <TSExecuteStatementResp>
+RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, string message) {
+ TSStatus status = getStatus(tsStatusCode, message);
+ return getTSExecuteStatementResp(status);
+}
+
+shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatus &status) {
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ TSStatus tsStatus(status);
+ resp->status = status;
+ return resp;
+}
+
+shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
+ TSStatus status = getStatus(tsStatusCode);
+ return getTSFetchResultsResp(status);
+}
+
+shared_ptr <TSFetchResultsResp>
+RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, string appendMessage) {
+ TSStatus status = getStatus(tsStatusCode, appendMessage);
+ return getTSFetchResultsResp(status);
+}
+
+shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatus &status) {
+ shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
+ TSStatus tsStatus(status);
+ resp->__set_status(tsStatus);
+ return resp;
+}
+
+void Tablet::reset() {
+ rowSize = 0;
+}
+
+void Tablet::createColumns() {
+ // create timestamp column
+ timestamps.resize(maxRowNumber);
+ // create value columns
+ values.resize(schemas.size());
+ for (int i = 0; i < schemas.size(); i++) {
+ values[i].resize(maxRowNumber);
+ }
+}
+
+int Tablet::getTimeBytesSize() {
+ return rowSize * 8;
+}
+
+int Tablet::getValueByteSize() {
+ int valueOccupation = 0;
+ for (int i = 0; i < schemas.size(); i++) {
+ switch (schemas[i].second) {
+ case TSDataType::BOOLEAN:
+ valueOccupation += rowSize;
+ break;
+ case TSDataType::INT32:
+ valueOccupation += rowSize * 4;
+ break;
+ case TSDataType::INT64:
+ valueOccupation += rowSize * 8;
+ break;
+ case TSDataType::FLOAT:
+ valueOccupation += rowSize * 4;
+ break;
+ case TSDataType::DOUBLE:
+ valueOccupation += rowSize * 8;
+ break;
+ case TSDataType::TEXT:
+ valueOccupation += rowSize * 4;
+ for (string value : values[i]) {
+ valueOccupation += value.size();
+ }
+ break;
+ default:
+ throw UnSupportedDataTypeException(
+ string("Data type ") + to_string(schemas[i].second) + " is not supported.");
+ }
+ }
+ return valueOccupation;
+}
+
+string SessionUtils::getTime(Tablet &tablet) {
+ MyStringBuffer timeBuffer;
+ for (int i = 0; i < tablet.rowSize; i++) {
+ timeBuffer.putLong(tablet.timestamps[i]);
+ }
+ return timeBuffer.str;
+}
+
+string SessionUtils::getValue(Tablet &tablet) {
+ MyStringBuffer valueBuffer;
+ for (int i = 0; i < tablet.schemas.size(); i++) {
+ TSDataType::TSDataType dataType = tablet.schemas[i].second;
+ switch (dataType) {
+ case TSDataType::BOOLEAN:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putBool(tablet.values[i][index] == "true");
+ }
+ break;
+ case TSDataType::INT32:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putInt(stoi(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::INT64:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putLong(stol(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::FLOAT:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putFloat(stof(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::DOUBLE:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putDouble(stod(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::TEXT:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putString(tablet.values[i][index]);
+ }
+ break;
+ default:
+ throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
+ }
+ }
+ return valueBuffer.str;
+}
+
+int SessionDataSet::getBatchSize() {
+ return batchSize;
+}
+
+void SessionDataSet::setBatchSize(int batchSize) {
+ this->batchSize = batchSize;
+}
+
+vector <string> SessionDataSet::getColumnNames() { return this->columnNameList; }
+
+bool SessionDataSet::hasNext() {
+ if (hasCachedRecord) {
+ return true;
+ }
+ if (!tsQueryDataSetTimeBuffer.hasRemaining()) {
+ shared_ptr <TSFetchResultsReq> req(new TSFetchResultsReq());
+ req->__set_sessionId(sessionId);
+ req->__set_statement(sql);
+ req->__set_fetchSize(batchSize);
+ req->__set_queryId(queryId);
+ req->__set_isAlign(true);
+ try {
+ shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
+ client->fetchResults(*resp, *req);
+ RpcUtils::verifySuccess(resp->status);
+
+ if (!resp->hasResultSet) {
+ return false;
+ } else {
+ tsQueryDataSet = make_shared<TSQueryDataSet>(resp->queryDataSet);
+ tsQueryDataSetTimeBuffer = tsQueryDataSet->time;
+ rowsIndex = 0;
+ }
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(
+ string("Cannot fetch result from server, because of network connection: ") + e.what());
+ }
+ }
+
+ constructOneRow();
+ hasCachedRecord = true;
+ return true;
+}
+
+void SessionDataSet::constructOneRow() {
+ vector <Field> outFields;
+ int loc = 0;
+ for (int i = 0; i < columnSize; i++) {
+ Field field;
+ if (duplicateLocation.find(i) != duplicateLocation.end()) {
+ field = outFields[duplicateLocation[i]];
+ } else {
+ MyStringBuffer *bitmapBuffer = bitmapBuffers[loc].get();
+ // another new 8 row, should move the bitmap buffer position to next byte
+ if (rowsIndex % 8 == 0) {
+ currentBitmap[loc] = bitmapBuffer->getChar();
+ }
+
+ if (!isNull(loc, rowsIndex)) {
+ MyStringBuffer *valueBuffer = valueBuffers[loc].get();
+ TSDataType::TSDataType dataType = getTSDataTypeFromString(columnTypeDeduplicatedList[loc]);
+ field.dataType = dataType;
+ switch (dataType) {
+ case TSDataType::BOOLEAN: {
+ bool booleanValue = valueBuffer->getBool();
+ field.boolV = booleanValue;
+ break;
+ }
+ case TSDataType::INT32: {
+ int intValue = valueBuffer->getInt();
+ field.intV = intValue;
+ break;
+ }
+ case TSDataType::INT64: {
+ int64_t longValue = valueBuffer->getLong();
+ field.longV = longValue;
+ break;
+ }
+ case TSDataType::FLOAT: {
+ float floatValue = valueBuffer->getFloat();
+ field.floatV = floatValue;
+ break;
+ }
+ case TSDataType::DOUBLE: {
+ double doubleValue = valueBuffer->getDouble();
+ field.doubleV = doubleValue;
+ break;
+ }
+ case TSDataType::TEXT: {
+ string stringValue = valueBuffer->getString();
+ field.stringV = stringValue;
+ break;
+ }
+ default: {
+ throw UnSupportedDataTypeException(
+ string("Data type ") + columnTypeDeduplicatedList[i].c_str() + " is not supported.");
+ }
+ }
+ } else {
+ field.dataType = TSDataType::NULLTYPE;
+ }
+ loc++;
+ }
+ outFields.push_back(field);
+ }
+
+ rowRecord = RowRecord(tsQueryDataSetTimeBuffer.getLong(), outFields);
+ rowsIndex++;
+}
+
+bool SessionDataSet::isNull(int index, int rowNum) {
+ char bitmap = currentBitmap[index];
+ int shift = rowNum % 8;
+ return ((flag >> shift) & bitmap) == 0;
+}
+
+RowRecord *SessionDataSet::next() {
+ if (!hasCachedRecord) {
+ if (!hasNext()) {
+ return NULL;
+ }
+ }
+
+ hasCachedRecord = false;
+ return &rowRecord;
+}
+
+void SessionDataSet::closeOperationHandle() {
+ shared_ptr <TSCloseOperationReq> closeReq(new TSCloseOperationReq());
+ closeReq->__set_sessionId(sessionId);
+ closeReq->__set_statementId(statementId);
+ closeReq->__set_queryId(queryId);
+ shared_ptr <TSStatus> closeResp(new TSStatus());
+ try {
+ client->closeOperation(*closeResp, *closeReq);
+ RpcUtils::verifySuccess(*closeResp);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(
+ string("Error occurs when connecting to server for close operation, because: ") + e.what());
+ }
+}
+
+
+/**
+ * check whether the batch has been sorted
+ *
+ * @return whether the batch has been sorted
+ */
+bool Session::checkSorted(Tablet &tablet) {
+ for (int i = 1; i < tablet.rowSize; i++) {
+ if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool Session::checkSorted(vector <int64_t> ×) {
+ for (int i = 1; i < times.size(); i++) {
+ if (times[i] < times[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void Session::sortTablet(Tablet &tablet) {
+ /*
+ * following part of code sort the batch data by time,
+ * so we can insert continuous data in value list to get a better performance
+ */
+ // sort to get index, and use index to sort value list
+ int *index = new int[tablet.rowSize];
+ for (int i = 0; i < tablet.rowSize; i++) {
+ index[i] = i;
+ }
+
+ this->sortIndexByTimestamp(index, tablet.timestamps, tablet.rowSize);
+ sort(tablet.timestamps.begin(), tablet.timestamps.begin() + tablet.rowSize);
+ for (int i = 0; i < tablet.schemas.size(); i++) {
+ tablet.values[i] = sortList(tablet.values[i], index, tablet.rowSize);
+ }
+
+ delete[] index;
+}
+
+void Session::sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps, int length) {
+ // Use Insert Sort Algorithm
+ if (length >= 2) {
+ for (int i = 1; i < length; i++) {
+ int x = timestamps[i];
+ int tmpIndex = index[i];
+ int j = i - 1;
+
+ while (j >= 0 && timestamps[j] > x) {
+ timestamps[j + 1] = timestamps[j];
+ index[j + 1] = index[j];
+ j--;
+ }
+
+ timestamps[j + 1] = x;
+ index[j + 1] = tmpIndex;
+ }
+ }
+}
+
+/**
+ * Append value into buffer in Big Endian order to comply with IoTDB server
+ */
+void Session::appendValues(string &buffer, char *value, int size) {
+ for (int i = size - 1; i >= 0; i--) {
+ buffer.append(value + i, 1);
+ }
+}
+
+void Session::putValuesIntoBuffer(vector <TSDataType::TSDataType> &types, vector<char *> &values, string &buf) {
+ for (int i = 0; i < values.size(); i++) {
+ int8_t typeNum = getDataTypeNumber(types[i]);
+ buf.append((char *) (&typeNum), sizeof(int8_t));
+ switch (types[i]) {
+ case TSDataType::BOOLEAN:
+ buf.append(values[i], 1);
+ break;
+ case TSDataType::INT32:
+ appendValues(buf, values[i], sizeof(int32_t));
+ break;
+ case TSDataType::INT64:
+ appendValues(buf, values[i], sizeof(int64_t));
+ break;
+ case TSDataType::FLOAT:
+ appendValues(buf, values[i], sizeof(float));
+ break;
+ case TSDataType::DOUBLE:
+ appendValues(buf, values[i], sizeof(double));
+ break;
+ case TSDataType::TEXT:
+ string str(values[i]);
+ int len = str.length();
+ appendValues(buf, (char *) (&len), sizeof(int));
+ // no need to change the byte order of string value
+ buf.append(values[i], len);
+ break;
+ }
+ }
+}
+
+int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
+ switch (type) {
+ case TSDataType::BOOLEAN:
+ return 0;
+ case TSDataType::INT32:
+ return 1;
+ case TSDataType::INT64:
+ return 2;
+ case TSDataType::FLOAT:
+ return 3;
+ case TSDataType::DOUBLE:
+ return 4;
+ case TSDataType::TEXT:
+ return 5;
+ default:
+ return -1;
+ }
+}
+
+void Session::open() {
+ try {
+ open(false, DEFAULT_TIMEOUT_MS);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::open(bool enableRPCCompression) {
+ try {
+ open(enableRPCCompression, DEFAULT_TIMEOUT_MS);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
+ if (!isClosed) {
+ return;
+ }
+ shared_ptr <TSocket> socket(new TSocket(host, rpcPort));
+ shared_ptr <TTransport> transport(new TFramedTransport(socket));
+ socket->setConnTimeout(connectionTimeoutInMs);
+ if (!transport->isOpen()) {
+ try {
+ transport->open();
+ }
+ catch (TTransportException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+ if (enableRPCCompression) {
+ shared_ptr <TCompactProtocol> protocol(new TCompactProtocol(transport));
+ shared_ptr <TSIServiceIf> client_instance(new TSIServiceClient(protocol));
+ client = client_instance;
+ } else {
+ shared_ptr <TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ shared_ptr <TSIServiceIf> client_instance(new TSIServiceClient(protocol));
+ client = client_instance;
+ }
+ shared_ptr <TSOpenSessionReq> openReq(new TSOpenSessionReq());
+ openReq->__set_username(username);
+ openReq->__set_password(password);
+ openReq->__set_zoneId(zoneId);
+ try {
+ shared_ptr <TSOpenSessionResp> openResp(new TSOpenSessionResp());
+ client->openSession(*openResp, *openReq);
+ RpcUtils::verifySuccess(openResp->status);
+ if (protocolVersion != openResp->serverProtocolVersion) {
+ if (openResp->serverProtocolVersion == 0) {// less than 0.10
+ throw logic_error(string("Protocol not supported, Client version is ") + to_string(protocolVersion) +
+ ", but Server version is " + to_string(openResp->serverProtocolVersion));
+ }
+ }
+
+ sessionId = openResp->sessionId;
+ statementId = client->requestStatementId(sessionId);
+
+ if (zoneId != "") {
+ setTimeZone(zoneId);
+ } else {
+ zoneId = getTimeZone();
+ }
+ }
+ catch (exception e) {
+ transport->close();
+ throw IoTDBConnectionException(e.what());
+ }
+ isClosed = false;
+}
+
+
+void Session::close() {
+ if (isClosed) {
+ return;
+ }
+ shared_ptr <TSCloseSessionReq> req(new TSCloseSessionReq());
+ req->__set_sessionId(sessionId);
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->closeSession(*resp, *req);
+ }
+ catch (exception e) {
+ throw IoTDBConnectionException(
+ string("Error occurs when closing session at server. Maybe server is down. ") + e.what());
+ }
+ isClosed = true;
+ if (transport != NULL) {
+ transport->close();
+ }
+}
+
+
+void Session::insertRecord(string deviceId, int64_t time, vector <string> &measurements, vector <string> &values) {
+ shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
+ req->__set_sessionId(sessionId);
+ req->__set_deviceId(deviceId);
+ req->__set_timestamp(time);
+ req->__set_measurements(measurements);
+ req->__set_values(values);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->insertStringRecord(*resp, *req);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertRecord(string prefixPath, int64_t time, vector <string> &measurements,
+ vector <TSDataType::TSDataType> &types, vector<char *> &values) {
+ shared_ptr <TSInsertRecordReq> req(new TSInsertRecordReq());
+ req->__set_sessionId(sessionId);
+ req->__set_prefixPath(prefixPath);
+ req->__set_timestamp(time);
+ req->__set_measurements(measurements);
+ string buffer;
+ putValuesIntoBuffer(types, values, buffer);
+ req->__set_values(buffer);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->insertRecord(*resp, *req);
+ RpcUtils::verifySuccess(*resp);
+ } catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void
+Session::insertRecords(vector <string> &deviceIds, vector <int64_t> ×, vector <vector<string>> &measurementsList,
+ vector <vector<string>> &valuesList) {
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
+ logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
+ throw exception(e);
+ }
+ shared_ptr <TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceIds(deviceIds);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ request->__set_valuesList(valuesList);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->insertStringRecords(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertRecords(vector <string> &deviceIds, vector <int64_t> ×,
+ vector <vector<string>> &measurementsList,
+ vector <vector<TSDataType::TSDataType>> typesList,
+ vector <vector<char *>> &valuesList) {
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
+ logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
+ throw exception(e);
+ }
+ shared_ptr <TSInsertRecordsReq> request(new TSInsertRecordsReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceIds(deviceIds);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ vector <string> bufferList;
+ for (int i = 0; i < valuesList.size(); i++) {
+ string buffer;
+ putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+ bufferList.push_back(buffer);
+ }
+ request->__set_valuesList(bufferList);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->insertRecords(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ } catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t> ×,
+ vector <vector<string>> measurementsList,
+ vector <vector<TSDataType::TSDataType>> typesList,
+ vector <vector<char *>> &valuesList) {
+ insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t> ×,
+ vector <vector<string>> measurementsList,
+ vector <vector<TSDataType::TSDataType>> typesList,
+ vector <vector<char *>> &valuesList, bool sorted) {
+
+ if (sorted) {
+ if (!checkSorted(times)) {
+ throw BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+ }
+ } else {
+ int *index = new int[times.size()];
+ for (int i = 0; i < times.size(); i++) {
+ index[i] = i;
+ }
+
+ this->sortIndexByTimestamp(index, times, times.size());
+ sort(times.begin(), times.end());
+ measurementsList = sortList(measurementsList, index, times.size());
+ typesList = sortList(typesList, index, times.size());
+ valuesList = sortList(valuesList, index, times.size());
+ delete[] index;
+ }
+ unique_ptr <TSInsertRecordsOfOneDeviceReq> request(new TSInsertRecordsOfOneDeviceReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceId(deviceId);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ vector <string> bufferList;
+ for (int i = 0; i < valuesList.size(); i++) {
+ string buffer;
+ putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+ bufferList.push_back(buffer);
+ }
+ request->__set_valuesList(bufferList);
+
+ try {
+ unique_ptr <TSStatus> resp(new TSStatus());
+ client->insertRecordsOfOneDevice(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ } catch (const exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertTablet(Tablet &tablet) {
+ try {
+ insertTablet(tablet, false);
+ }
+ catch (const exception &e) {
+ logic_error error(e.what());
+ throw exception(error);
+ }
+}
+
+void Session::insertTablet(Tablet &tablet, bool sorted) {
+ if (sorted) {
+ if (!checkSorted(tablet)) {
+ throw BatchExecutionException("Times in Tablet are not in ascending order");
+ }
+ } else {
+ sortTablet(tablet);
+ }
+
+ shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
+ request->__set_sessionId(sessionId);
+ request->prefixPath = tablet.deviceId;
+ for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
+ request->measurements.push_back(schema.first);
+ request->types.push_back(schema.second);
+ }
+ request->__set_timestamps(SessionUtils::getTime(tablet));
+ request->__set_values(SessionUtils::getValue(tablet));
+ request->__set_size(tablet.rowSize);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->insertTablet(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw new IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertTablets(map<string, Tablet *> &tablets) {
+ try {
+ insertTablets(tablets, false);
+ }
+ catch (const exception &e) {
+ logic_error error(e.what());
+ throw exception(error);
+ }
+}
+
+void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
+ shared_ptr <TSInsertTabletsReq> request(new TSInsertTabletsReq());
+ request->__set_sessionId(sessionId);
+
+ for (auto &item : tablets) {
+ if (sorted) {
+ if (!checkSorted(*(item.second))) {
+ throw BatchExecutionException("Times in Tablet are not in ascending order");
+ }
+ } else {
+ sortTablet(*(tablets[item.first]));
+ }
+
+ request->deviceIds.push_back(item.second->deviceId);
+ vector <string> measurements;
+ vector<int> dataTypes;
+ for (pair <string, TSDataType::TSDataType> schema : item.second->schemas) {
+ measurements.push_back(schema.first);
+ dataTypes.push_back(schema.second);
+ }
+ request->measurementsList.push_back(measurements);
+ request->typesList.push_back(dataTypes);
+ request->timestampsList.push_back(SessionUtils::getTime(*(item.second)));
+ request->valuesList.push_back(SessionUtils::getValue(*(item.second)));
+ request->sizeList.push_back(item.second->rowSize);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->insertTablets(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (const exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+}
+
+void Session::testInsertRecord(string deviceId, int64_t time, vector <string> &measurements, vector <string> &values) {
+ shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
+ req->__set_sessionId(sessionId);
+ req->__set_deviceId(deviceId);
+ req->__set_timestamp(time);
+ req->__set_measurements(measurements);
+ req->__set_values(values);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->insertStringRecord(*resp, *req);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::testInsertTablet(Tablet &tablet) {
+ shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
+ request->__set_sessionId(sessionId);
+ request->prefixPath = tablet.deviceId;
+ for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
+ request->measurements.push_back(schema.first);
+ request->types.push_back(schema.second);
+ }
+ request->__set_timestamps(SessionUtils::getTime(tablet));
+ request->__set_values(SessionUtils::getValue(tablet));
+ request->__set_size(tablet.rowSize);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->testInsertTablet(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw new IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::testInsertRecords(vector <string> &deviceIds, vector <int64_t> ×,
+ vector <vector<string>> &measurementsList, vector <vector<string>> &valuesList) {
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
+ logic_error error("deviceIds, times, measurementsList and valuesList's size should be equal");
+ throw exception(error);
+ }
+ shared_ptr <TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceIds(deviceIds);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ request->__set_valuesList(valuesList);
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->insertStringRecords(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::deleteTimeseries(string path) {
+ vector <string> paths;
+ paths.push_back(path);
+ deleteTimeseries(paths);
+}
+
+void Session::deleteTimeseries(vector <string> &paths) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->deleteTimeseries(*resp, sessionId, paths);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::deleteData(string path, int64_t time) {
+ vector <string> paths;
+ paths.push_back(path);
+ deleteData(paths, time);
+}
+
+void Session::deleteData(vector <string> &deviceId, int64_t time) {
+ shared_ptr <TSDeleteDataReq> req(new TSDeleteDataReq());
+ req->__set_sessionId(sessionId);
+ req->__set_paths(deviceId);
+ req->__set_endTime(time);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->deleteData(*resp, *req);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::setStorageGroup(string storageGroupId) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->setStorageGroup(*resp, sessionId, storageGroupId);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::deleteStorageGroup(string storageGroup) {
+ vector <string> storageGroups;
+ storageGroups.push_back(storageGroup);
+ deleteStorageGroups(storageGroups);
+}
+
+void Session::deleteStorageGroups(vector <string> &storageGroups) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->deleteStorageGroups(*resp, sessionId, storageGroups);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::createTimeseries(string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor) {
+ try {
+ createTimeseries(path, dataType, encoding, compressor, NULL, NULL, NULL, "");
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::createTimeseries(string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor,
+ map <string, string> *props, map <string, string> *tags,
+ map <string, string> *attributes, string measurementAlias) {
+ shared_ptr <TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
+ req->__set_sessionId(sessionId);
+ req->__set_path(path);
+ req->__set_dataType(dataType);
+ req->__set_encoding(encoding);
+ req->__set_compressor(compressor);
+ if (props != NULL) {
+ req->__set_props(*props);
+ }
+
+ if (tags != NULL) {
+ req->__set_tags(*tags);
+ }
+ if (attributes != NULL) {
+ req->__set_attributes(*attributes);
+ }
+ if (measurementAlias != "") {
+ req->__set_measurementAlias(measurementAlias);
+ }
+
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->createTimeseries(*resp, *req);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::createMultiTimeseries(vector <string> paths, vector <TSDataType::TSDataType> dataTypes,
+ vector <TSEncoding::TSEncoding> encodings,
+ vector <CompressionType::CompressionType> compressors,
+ vector <map<string, string>> *propsList, vector <map<string, string>> *tagsList,
+ vector <map<string, string>> *attributesList,
+ vector <string> *measurementAliasList) {
+ shared_ptr <TSCreateMultiTimeseriesReq> request(new TSCreateMultiTimeseriesReq());
+ request->__set_sessionId(sessionId);
+ request->__set_paths(paths);
+
+ vector<int> dataTypesOrdinal;
+ for (TSDataType::TSDataType dataType : dataTypes) {
+ dataTypesOrdinal.push_back(dataType);
+ }
+ request->__set_dataTypes(dataTypesOrdinal);
+
+ vector<int> encodingsOrdinal;
+ for (TSEncoding::TSEncoding encoding : encodings) {
+ encodingsOrdinal.push_back(encoding);
+ }
+ request->__set_encodings(encodingsOrdinal);
+
+ vector<int> compressorsOrdinal;
+ for (CompressionType::CompressionType compressor: compressors) {
+ compressorsOrdinal.push_back(compressor);
+ }
+ request->__set_compressors(compressorsOrdinal);
+
+ if (propsList != NULL) {
+ request->__set_propsList(*propsList);
+ }
+
+ if (tagsList != NULL) {
+ request->__set_tagsList(*tagsList);
+ }
+ if (attributesList != NULL) {
+ request->__set_attributesList(*attributesList);
+ }
+ if (measurementAliasList != NULL) {
+ request->__set_measurementAliasList(*measurementAliasList);
+ }
+
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->createMultiTimeseries(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+bool Session::checkTimeseriesExists(string path) {
+ try {
+ std::unique_ptr <SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
+ bool isExisted = dataset->hasNext();
+ dataset->closeOperationHandle();
+ return isExisted;
+ }
+ catch (exception e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+string Session::getTimeZone() {
+ if (zoneId != "") {
+ return zoneId;
+ }
+ shared_ptr <TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
+ try {
+ client->getTimeZone(*resp, sessionId);
+ RpcUtils::verifySuccess(resp->status);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ return resp->timeZone;
+}
+
+void Session::setTimeZone(string zoneId) {
+ shared_ptr <TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
+ req->__set_sessionId(sessionId);
+ req->__set_timeZone(zoneId);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->setTimeZone(*resp, *req);
+ }
+ catch (IoTDBConnectionException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ RpcUtils::verifySuccess(*resp);
+ this->zoneId = zoneId;
+}
+
+unique_ptr <SessionDataSet> Session::executeQueryStatement(string sql) {
+ shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
+ req->__set_sessionId(sessionId);
+ req->__set_statementId(statementId);
+ req->__set_statement(sql);
+ req->__set_fetchSize(fetchSize);
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ try {
+ client->executeStatement(*resp, *req);
+ RpcUtils::verifySuccess(resp->status);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ shared_ptr <TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp->queryDataSet));
+ return unique_ptr<SessionDataSet>(new SessionDataSet(
+ sql, resp->columns, resp->dataTypeList, resp->queryId, statementId, client, sessionId, queryDataSet));
+}
+
+void Session::executeNonQueryStatement(string sql) {
+ shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
+ req->__set_sessionId(sessionId);
+ req->__set_statementId(statementId);
+ req->__set_statement(sql);
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ try {
+ client->executeUpdateStatement(*resp, *req);
+ RpcUtils::verifySuccess(resp->status);
+ }
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 4b0dc7a..3155e83 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -1,737 +1,737 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <string>
-#include <vector>
-#include <exception>
-#include <iostream>
-#include <algorithm>
-#include <map>
-#include <utility>
-#include <memory>
-#include <new>
-#include <thread>
-#include <mutex>
-#include <stdexcept>
-#include <cstdlib>
-#include <thrift/protocol/TBinaryProtocol.h>
-#include <thrift/protocol/TCompactProtocol.h>
-#include <thrift/transport/TSocket.h>
-#include <thrift/transport/TTransportException.h>
-#include <thrift/transport/TBufferTransports.h>
-#include "TSIService.h"
-
-using ::apache::thrift::protocol::TBinaryProtocol;
-using ::apache::thrift::protocol::TCompactProtocol;
-using ::apache::thrift::transport::TSocket;
-using ::apache::thrift::transport::TTransport;
-using ::apache::thrift::transport::TTransportException;
-using ::apache::thrift::transport::TBufferedTransport;
-using ::apache::thrift::transport::TFramedTransport;
-using ::apache::thrift::TException;
-
-class IoTDBConnectionException : public std::exception {
-public:
- IoTDBConnectionException() : message() {}
-
- IoTDBConnectionException(const char *m) : message(m) {}
-
- IoTDBConnectionException(std::string m) : message(m) {}
-
- virtual const char *what() const throw() {
- return message.c_str();
- }
-
-private:
- std::string message;
-};
-
-class BatchExecutionException : public std::exception {
-public:
- BatchExecutionException() : message() {}
-
- BatchExecutionException(const char *m) : message(m) {}
-
- BatchExecutionException(std::string m) : message(m) {}
-
- BatchExecutionException(std::vector <TSStatus> statusList) : message(), statusList(statusList) {}
-
- BatchExecutionException(std::vector <TSStatus> statusList, std::string m) : message(m), statusList(statusList) {}
-
- virtual const char *what() const throw() {
- return message.c_str();
- }
-
- std::vector <TSStatus> statusList;
-private:
- std::string message;
-
-};
-
-class UnSupportedDataTypeException : public std::exception {
-private:
- std::string message;
-public:
- UnSupportedDataTypeException() : message() {}
-
- UnSupportedDataTypeException(const char *m) : message(m) {}
-
- UnSupportedDataTypeException(std::string m) : message("UnSupported dataType: " + m) {}
-};
-
-namespace CompressionType {
-
- enum CompressionType {
- UNCOMPRESSED, SNAPPY, GZIP, LZO, SDT, PAA, PLA, LZ4
- };
-}
-namespace TSDataType {
- enum TSDataType {
- BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
- };
-}
-namespace TSEncoding {
- enum TSEncoding {
- PLAIN = 0,
- DICTIONARY = 1,
- RLE = 2,
- DIFF = 3,
- TS_2DIFF = 4,
- BITMAP = 5,
- GORILLA_V1 = 6,
- REGULAR = 7,
- GORILLA = 8,
- };
-}
-namespace TSStatusCode {
- enum TSStatusCode {
- SUCCESS_STATUS = 200,
- STILL_EXECUTING_STATUS = 201,
- INVALID_HANDLE_STATUS = 202,
-
- NODE_DELETE_FAILED_ERROR = 298,
- ALIAS_ALREADY_EXIST_ERROR = 299,
- PATH_ALREADY_EXIST_ERROR = 300,
- PATH_NOT_EXIST_ERROR = 301,
- UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR = 302,
- METADATA_ERROR = 303,
- OUT_OF_TTL_ERROR = 305,
- CONFIG_ADJUSTER = 306,
- MERGE_ERROR = 307,
- SYSTEM_CHECK_ERROR = 308,
- SYNC_DEVICE_OWNER_CONFLICT_ERROR = 309,
- SYNC_CONNECTION_EXCEPTION = 310,
- STORAGE_GROUP_PROCESSOR_ERROR = 311,
- STORAGE_GROUP_ERROR = 312,
- STORAGE_ENGINE_ERROR = 313,
- TSFILE_PROCESSOR_ERROR = 314,
- PATH_ILLEGAL = 315,
- LOAD_FILE_ERROR = 316,
- STORAGE_GROUP_NOT_READY = 317,
-
- EXECUTE_STATEMENT_ERROR = 400,
- SQL_PARSE_ERROR = 401,
- GENERATE_TIME_ZONE_ERROR = 402,
- SET_TIME_ZONE_ERROR = 403,
- NOT_STORAGE_GROUP_ERROR = 404,
- QUERY_NOT_ALLOWED = 405,
- AST_FORMAT_ERROR = 406,
- LOGICAL_OPERATOR_ERROR = 407,
- LOGICAL_OPTIMIZE_ERROR = 408,
- UNSUPPORTED_FILL_TYPE_ERROR = 409,
- PATH_ERROR = 410,
- QUERY_PROCESS_ERROR = 411,
- WRITE_PROCESS_ERROR = 412,
-
- INTERNAL_SERVER_ERROR = 500,
- CLOSE_OPERATION_ERROR = 501,
- READ_ONLY_SYSTEM_ERROR = 502,
- DISK_SPACE_INSUFFICIENT_ERROR = 503,
- START_UP_ERROR = 504,
- SHUT_DOWN_ERROR = 505,
- MULTIPLE_ERROR = 506,
- WRONG_LOGIN_PASSWORD_ERROR = 600,
- NOT_LOGIN_ERROR = 601,
- NO_PERMISSION_ERROR = 602,
- UNINITIALIZED_AUTH_ERROR = 603,
- PARTITION_NOT_READY = 700,
- TIME_OUT = 701,
- NO_LEADER = 702,
- UNSUPPORTED_OPERATION = 703,
- NODE_READ_ONLY = 704,
- INCOMPATIBLE_VERSION = 203,
- };
-}
-
-class RpcUtils {
-public:
- std::shared_ptr <TSStatus> SUCCESS_STATUS;
-
- RpcUtils() {
- SUCCESS_STATUS = std::make_shared<TSStatus>();
- SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
- }
-
- static void verifySuccess(TSStatus &status);
-
- static void verifySuccess(std::vector <TSStatus> &statuses);
-
- static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
-
- static TSStatus getStatus(int code, std::string message);
-
- static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
-
- static std::shared_ptr <TSExecuteStatementResp>
- getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, std::string message);
-
- static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatus &status);
-
- static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
-
- static std::shared_ptr <TSFetchResultsResp>
- getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, std::string appendMessage);
-
- static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatus &status);
-};
-
-// Simulate the ByteBuffer class in Java
-class MyStringBuffer {
-private:
- char *getchar(int len) {
- char *ret = new char[len];
- for (int i = pos; i < pos + len; i++)
- ret[pos + len - 1 - i] = str[i];
- pos += len;
- return ret;
- }
-
- void putchar(int len, char *ins) {
- for (int i = len - 1; i > -1; i--)
- str += ins[i];
- }
-
-public:
- std::string str;
- int pos;
-
- bool hasRemaining() {
- return pos < str.size();
- }
-
- MyStringBuffer() {}
-
- MyStringBuffer(std::string str) {
- this->str = str;
- this->pos = 0;
- }
-
- int getInt() {
- char *data = getchar(4);
- int ret = *(int *) data;
- delete[]data;
- return ret;
- }
-
- int64_t getLong() {
- char *data = getchar(8);
- int64_t ret = *(int64_t *) data;
- delete[]data;
- return ret;
- }
-
- float getFloat() {
- char *data = getchar(4);
- float ret = *(float *) data;
- delete[]data;
- return ret;
- }
-
- double getDouble() {
- char *data = getchar(8);
- double ret = *(double *) data;
- delete[]data;
- return ret;
- }
-
- char getChar() {
- char *data = getchar(1);
- char ret = *(char *) data;
- delete[]data;
- return ret;
- }
-
- bool getBool() {
- return getChar() == 1;
- }
-
- std::string getString() {
- int len = getInt();
- std::string ret;
- for (int i = 0; i < len; i++) ret.append(1, getChar());
- return ret;
- }
-
- void putInt(int ins) {
- char *data = (char *) &ins;
- putchar(4, data);
- }
-
- void putLong(int64_t ins) {
- char *data = (char *) &ins;
- putchar(8, data);
- }
-
- void putFloat(float ins) {
- char *data = (char *) &ins;
- putchar(4, data);
- }
-
- void putDouble(double ins) {
- char *data = (char *) &ins;
- putchar(8, data);
- }
-
- void putChar(char ins) {
- char *data = (char *) &ins;
- putchar(1, data);
- }
-
- void putBool(bool ins) {
- char tmp = 0;
- if (ins) tmp = 1;
- putChar(tmp);
- }
-
- void putString(std::string ins) {
- int len = ins.size();
- putInt(len);
- for (int i = 0; i < len; i++) putChar(ins[i]);
- }
-};
-
-class Field {
-public:
- TSDataType::TSDataType dataType;
- bool boolV;
- int intV;
- int64_t longV;
- float floatV;
- double doubleV;
- std::string stringV;
-
- Field(TSDataType::TSDataType a) {
- dataType = a;
- }
-
- Field() {}
-};
-
-/*
- * A tablet data of one device, the tablet contains multiple measurements of this device that share
- * the same time column.
- *
- * for example: device root.sg1.d1
- *
- * time, m1, m2, m3
- * 1, 1, 2, 3
- * 2, 1, 2, 3
- * 3, 1, 2, 3
- *
- * Notice: The tablet should not have empty cell
- *
- */
-class Tablet {
-private:
- static const int DEFAULT_SIZE = 1024;
-public:
- std::string deviceId; // deviceId of this tablet
- std::vector <std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
- std::vector <int64_t> timestamps; //timestamps in this tablet
- std::vector <std::vector<std::string>> values;
- int rowSize; //the number of rows to include in this tablet
- int maxRowNumber; // the maximum number of rows for this tablet
-
- Tablet() {}
-
- /**
- * Return a tablet with default specified row number. This is the standard
- * constructor (all Tablet should be the same size).
- *
- * @param deviceId the name of the device specified to be written in
- * @param timeseries the list of measurement schemas for creating the tablet
- */
- Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> ×eries) {
- Tablet(deviceId, timeseries, DEFAULT_SIZE);
- }
-
- /**
- * Return a tablet with the specified number of rows (maxBatchSize). Only
- * call this constructor directly for testing purposes. Tablet should normally
- * always be default size.
- *
- * @param deviceId the name of the device specified to be written in
- * @param schemas the list of measurement schemas for creating the row
- * batch
- * @param maxRowNumber the maximum number of rows for this tablet
- */
- Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> &schemas,
- int maxRowNumber) {
- this->deviceId = deviceId;
- this->schemas = schemas;
- this->maxRowNumber = maxRowNumber;
-
- // create timestamp column
- timestamps.resize(maxRowNumber);
- // create value columns
- values.resize(schemas.size());
- for (int i = 0; i < schemas.size(); i++) {
- values[i].resize(maxRowNumber);
- }
-
- this->rowSize = 0;
- }
-
- void reset(); // Reset Tablet to the default state - set the rowSize to 0
- void createColumns();
-
- int getTimeBytesSize();
-
- int getValueByteSize(); // total byte size that values occupies
-};
-
-class SessionUtils {
-public:
- static std::string getTime(Tablet &tablet);
-
- static std::string getValue(Tablet &tablet);
-};
-
-class RowRecord {
-public:
- int64_t timestamp;
- std::vector <Field> fields;
-
- RowRecord(int64_t timestamp) {
- this->timestamp = timestamp;
- }
-
- RowRecord(int64_t timestamp, std::vector <Field> &fields) {
- this->timestamp = timestamp;
- this->fields = fields;
- }
-
- RowRecord() {
- this->timestamp = -1;
- }
-
- void addField(Field &f) {
- this->fields.push_back(f);
- }
-
- std::string toString() {
- std::string ret = std::to_string(timestamp);
- for (int i = 0; i < fields.size(); i++) {
- ret.append("\t");
- TSDataType::TSDataType dataType = fields[i].dataType;
- switch (dataType) {
- case TSDataType::BOOLEAN: {
- std::string field = fields[i].boolV ? "true" : "false";
- ret.append(field);
- break;
- }
- case TSDataType::INT32: {
- ret.append(std::to_string(fields[i].intV));
- break;
- }
- case TSDataType::INT64: {
- ret.append(std::to_string(fields[i].longV));
- break;
- }
- case TSDataType::FLOAT: {
- ret.append(std::to_string(fields[i].floatV));
- break;
- }
- case TSDataType::DOUBLE: {
- ret.append(std::to_string(fields[i].doubleV));
- break;
- }
- case TSDataType::TEXT: {
- ret.append(fields[i].stringV);
- break;
- }
- case TSDataType::NULLTYPE: {
- ret.append("NULL");
- }
- }
- }
- ret.append("\n");
- return ret;
- }
-};
-
-class SessionDataSet {
-private:
- bool hasCachedRecord = false;
- std::string sql;
- int64_t queryId;
- int64_t statementId;
- int64_t sessionId;
- std::shared_ptr <TSIServiceIf> client;
- int batchSize = 1024;
- std::vector <std::string> columnNameList;
- std::vector <std::string> columnTypeDeduplicatedList;
- // duplicated column index -> origin index
- std::map<int, int> duplicateLocation;
- // column name -> column location
- std::map<std::string, int> columnMap;
- // column size
- int columnSize = 0;
-
- int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
- std::shared_ptr <TSQueryDataSet> tsQueryDataSet;
- MyStringBuffer tsQueryDataSetTimeBuffer;
- std::vector <std::unique_ptr<MyStringBuffer>> valueBuffers;
- std::vector <std::unique_ptr<MyStringBuffer>> bitmapBuffers;
- RowRecord rowRecord;
- char *currentBitmap = NULL; // used to cache the current bitmap for every column
- static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
-
-public:
- SessionDataSet() {}
-
- SessionDataSet(std::string sql, std::vector <std::string> &columnNameList,
- std::vector <std::string> &columnTypeList, int64_t queryId, int64_t statementId,
- std::shared_ptr <TSIServiceIf> client, int64_t sessionId,
- std::shared_ptr <TSQueryDataSet> queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
- this->sessionId = sessionId;
- this->sql = sql;
- this->queryId = queryId;
- this->statementId = statementId;
- this->client = client;
- this->columnNameList = columnNameList;
- this->currentBitmap = new char[columnNameList.size()];
- this->columnSize = columnNameList.size();
-
- // column name -> column location
- for (int i = 0; i < columnNameList.size(); i++) {
- std::string name = columnNameList[i];
- if (this->columnMap.find(name) != this->columnMap.end()) {
- duplicateLocation[i] = columnMap[name];
- } else {
- this->columnMap[name] = i;
- this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
- }
- this->valueBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[i])));
- this->bitmapBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[i])));
- }
- this->tsQueryDataSet = queryDataSet;
- }
-
- ~SessionDataSet() {
- if (currentBitmap != NULL) {
- delete[] currentBitmap;
- currentBitmap = NULL;
- }
- }
-
- int getBatchSize();
-
- void setBatchSize(int batchSize);
-
- std::vector <std::string> getColumnNames();
-
- bool hasNext();
-
- void constructOneRow();
-
- bool isNull(int index, int rowNum);
-
- RowRecord *next();
-
- void closeOperationHandle();
-};
-
-template<typename T>
-std::vector <T> sortList(std::vector <T> &valueList, int *index, int indexLength) {
- std::vector <T> sortedValues(valueList.size());
- for (int i = 0; i < indexLength; i++) {
- sortedValues[i] = valueList[index[i]];
- }
- return sortedValues;
-}
-
-class Session {
-private:
- std::string host;
- int rpcPort;
- std::string username;
- std::string password;
- TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
- std::shared_ptr <TSIServiceIf> client;
- std::shared_ptr <apache::thrift::transport::TSocket> transport;
- bool isClosed = true;
- int64_t sessionId;
- int64_t statementId;
- std::string zoneId;
- int fetchSize;
- const static int DEFAULT_FETCH_SIZE = 10000;
- const static int DEFAULT_TIMEOUT_MS = 0;
-
- bool checkSorted(Tablet &tablet);
-
- bool checkSorted(std::vector <int64_t> ×);
-
- void sortTablet(Tablet &tablet);
-
- void sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps, int length);
-
- std::string getTimeZone();
-
- void setTimeZone(std::string zoneId);
-
- void appendValues(std::string &buffer, char *value, int size);
-
- void
- putValuesIntoBuffer(std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values, std::string &buf);
-
- int8_t getDataTypeNumber(TSDataType::TSDataType type);
-
-public:
- Session(std::string host, int rpcPort) : username("user"), password("password") {
- this->host = host;
- this->rpcPort = rpcPort;
- }
-
- Session(std::string host, int rpcPort, std::string username, std::string password)
- : fetchSize(10000) {
- this->host = host;
- this->rpcPort = rpcPort;
- this->username = username;
- this->password = password;
- this->zoneId = "UTC+08:00";
- }
-
- Session(std::string host, int rpcPort, std::string username, std::string password, int fetchSize) {
- this->host = host;
- this->rpcPort = rpcPort;
- this->username = username;
- this->password = password;
- this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
- }
-
- Session(std::string host, std::string rpcPort, std::string username = "user",
- std::string password = "password", int fetchSize = 10000) {
- this->host = host;
- this->rpcPort = stoi(rpcPort);
- this->username = username;
- this->password = password;
- this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
- }
-
- void open();
-
- void open(bool enableRPCCompression);
-
- void open(bool enableRPCCompression, int connectionTimeoutInMs);
-
- void close();
-
- void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
- std::vector <std::string> &values);
-
- void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
- std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values);
-
- void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> &measurementsList,
- std::vector <std::vector<std::string>> &valuesList);
-
- void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> &measurementsList,
- std::vector <std::vector<TSDataType::TSDataType>> typesList,
- std::vector <std::vector<char *>> &valuesList);
-
- void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> measurementsList,
- std::vector <std::vector<TSDataType::TSDataType>> typesList,
- std::vector <std::vector<char *>> &valuesList);
-
- void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> measurementsList,
- std::vector <std::vector<TSDataType::TSDataType>> typesList,
- std::vector <std::vector<char *>> &valuesList, bool sorted);
-
- void insertTablet(Tablet &tablet);
-
- void insertTablet(Tablet &tablet, bool sorted);
-
- void insertTablets(std::map<std::string, Tablet *> &tablets);
-
- void insertTablets(std::map<std::string, Tablet *> &tablets, bool sorted);
-
- void testInsertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
- std::vector <std::string> &values);
-
- void testInsertTablet(Tablet &tablet);
-
- void testInsertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> &measurementsList,
- std::vector <std::vector<std::string>> &valuesList);
-
- void deleteTimeseries(std::string path);
-
- void deleteTimeseries(std::vector <std::string> &paths);
-
- void deleteData(std::string path, int64_t time);
-
- void deleteData(std::vector <std::string> &deviceId, int64_t time);
-
- void setStorageGroup(std::string storageGroupId);
-
- void deleteStorageGroup(std::string storageGroup);
-
- void deleteStorageGroups(std::vector <std::string> &storageGroups);
-
- void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
- CompressionType::CompressionType compressor);
-
- void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
- CompressionType::CompressionType compressor,
- std::map <std::string, std::string> *props, std::map <std::string, std::string> *tags,
- std::map <std::string, std::string> *attributes, std::string measurementAlias);
-
- void createMultiTimeseries(std::vector <std::string> paths, std::vector <TSDataType::TSDataType> dataTypes,
- std::vector <TSEncoding::TSEncoding> encodings,
- std::vector <CompressionType::CompressionType> compressors,
- std::vector <std::map<std::string, std::string>> *propsList,
- std::vector <std::map<std::string, std::string>> *tagsList,
- std::vector <std::map<std::string, std::string>> *attributesList,
- std::vector <std::string> *measurementAliasList);
-
- bool checkTimeseriesExists(std::string path);
-
- std::unique_ptr <SessionDataSet> executeQueryStatement(std::string sql);
-
- void executeNonQueryStatement(std::string sql);
-};
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+#include <vector>
+#include <exception>
+#include <iostream>
+#include <algorithm>
+#include <map>
+#include <utility>
+#include <memory>
+#include <new>
+#include <thread>
+#include <mutex>
+#include <stdexcept>
+#include <cstdlib>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TTransportException.h>
+#include <thrift/transport/TBufferTransports.h>
+#include "TSIService.h"
+
+using ::apache::thrift::protocol::TBinaryProtocol;
+using ::apache::thrift::protocol::TCompactProtocol;
+using ::apache::thrift::transport::TSocket;
+using ::apache::thrift::transport::TTransport;
+using ::apache::thrift::transport::TTransportException;
+using ::apache::thrift::transport::TBufferedTransport;
+using ::apache::thrift::transport::TFramedTransport;
+using ::apache::thrift::TException;
+
+class IoTDBConnectionException : public std::exception {
+public:
+ IoTDBConnectionException() : message() {}
+
+ IoTDBConnectionException(const char *m) : message(m) {}
+
+ IoTDBConnectionException(std::string m) : message(m) {}
+
+ virtual const char *what() const throw() {
+ return message.c_str();
+ }
+
+private:
+ std::string message;
+};
+
+class BatchExecutionException : public std::exception {
+public:
+ BatchExecutionException() : message() {}
+
+ BatchExecutionException(const char *m) : message(m) {}
+
+ BatchExecutionException(std::string m) : message(m) {}
+
+ BatchExecutionException(std::vector <TSStatus> statusList) : message(), statusList(statusList) {}
+
+ BatchExecutionException(std::vector <TSStatus> statusList, std::string m) : message(m), statusList(statusList) {}
+
+ virtual const char *what() const throw() {
+ return message.c_str();
+ }
+
+ std::vector <TSStatus> statusList;
+private:
+ std::string message;
+
+};
+
+class UnSupportedDataTypeException : public std::exception {
+private:
+ std::string message;
+public:
+ UnSupportedDataTypeException() : message() {}
+
+ UnSupportedDataTypeException(const char *m) : message(m) {}
+
+ UnSupportedDataTypeException(std::string m) : message("UnSupported dataType: " + m) {}
+};
+
+namespace CompressionType {
+
+ enum CompressionType {
+ UNCOMPRESSED, SNAPPY, GZIP, LZO, SDT, PAA, PLA, LZ4
+ };
+}
+namespace TSDataType {
+ enum TSDataType {
+ BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
+ };
+}
+namespace TSEncoding {
+ enum TSEncoding {
+ PLAIN = 0,
+ DICTIONARY = 1,
+ RLE = 2,
+ DIFF = 3,
+ TS_2DIFF = 4,
+ BITMAP = 5,
+ GORILLA_V1 = 6,
+ REGULAR = 7,
+ GORILLA = 8,
+ };
+}
+namespace TSStatusCode {
+ enum TSStatusCode {
+ SUCCESS_STATUS = 200,
+ STILL_EXECUTING_STATUS = 201,
+ INVALID_HANDLE_STATUS = 202,
+
+ NODE_DELETE_FAILED_ERROR = 298,
+ ALIAS_ALREADY_EXIST_ERROR = 299,
+ PATH_ALREADY_EXIST_ERROR = 300,
+ PATH_NOT_EXIST_ERROR = 301,
+ UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR = 302,
+ METADATA_ERROR = 303,
+ OUT_OF_TTL_ERROR = 305,
+ CONFIG_ADJUSTER = 306,
+ MERGE_ERROR = 307,
+ SYSTEM_CHECK_ERROR = 308,
+ SYNC_DEVICE_OWNER_CONFLICT_ERROR = 309,
+ SYNC_CONNECTION_EXCEPTION = 310,
+ STORAGE_GROUP_PROCESSOR_ERROR = 311,
+ STORAGE_GROUP_ERROR = 312,
+ STORAGE_ENGINE_ERROR = 313,
+ TSFILE_PROCESSOR_ERROR = 314,
+ PATH_ILLEGAL = 315,
+ LOAD_FILE_ERROR = 316,
+ STORAGE_GROUP_NOT_READY = 317,
+
+ EXECUTE_STATEMENT_ERROR = 400,
+ SQL_PARSE_ERROR = 401,
+ GENERATE_TIME_ZONE_ERROR = 402,
+ SET_TIME_ZONE_ERROR = 403,
+ NOT_STORAGE_GROUP_ERROR = 404,
+ QUERY_NOT_ALLOWED = 405,
+ AST_FORMAT_ERROR = 406,
+ LOGICAL_OPERATOR_ERROR = 407,
+ LOGICAL_OPTIMIZE_ERROR = 408,
+ UNSUPPORTED_FILL_TYPE_ERROR = 409,
+ PATH_ERROR = 410,
+ QUERY_PROCESS_ERROR = 411,
+ WRITE_PROCESS_ERROR = 412,
+
+ INTERNAL_SERVER_ERROR = 500,
+ CLOSE_OPERATION_ERROR = 501,
+ READ_ONLY_SYSTEM_ERROR = 502,
+ DISK_SPACE_INSUFFICIENT_ERROR = 503,
+ START_UP_ERROR = 504,
+ SHUT_DOWN_ERROR = 505,
+ MULTIPLE_ERROR = 506,
+ WRONG_LOGIN_PASSWORD_ERROR = 600,
+ NOT_LOGIN_ERROR = 601,
+ NO_PERMISSION_ERROR = 602,
+ UNINITIALIZED_AUTH_ERROR = 603,
+ PARTITION_NOT_READY = 700,
+ TIME_OUT = 701,
+ NO_LEADER = 702,
+ UNSUPPORTED_OPERATION = 703,
+ NODE_READ_ONLY = 704,
+ INCOMPATIBLE_VERSION = 203,
+ };
+}
+
+class RpcUtils {
+public:
+ std::shared_ptr <TSStatus> SUCCESS_STATUS;
+
+ RpcUtils() {
+ SUCCESS_STATUS = std::make_shared<TSStatus>();
+ SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
+ }
+
+ static void verifySuccess(TSStatus &status);
+
+ static void verifySuccess(std::vector <TSStatus> &statuses);
+
+ static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
+
+ static TSStatus getStatus(int code, std::string message);
+
+ static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
+
+ static std::shared_ptr <TSExecuteStatementResp>
+ getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, std::string message);
+
+ static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatus &status);
+
+ static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
+
+ static std::shared_ptr <TSFetchResultsResp>
+ getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, std::string appendMessage);
+
+ static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatus &status);
+};
+
+// Simulate the ByteBuffer class in Java
+class MyStringBuffer {
+private:
+ char *getchar(int len) {
+ char *ret = new char[len];
+ for (int i = pos; i < pos + len; i++)
+ ret[pos + len - 1 - i] = str[i];
+ pos += len;
+ return ret;
+ }
+
+ void putchar(int len, char *ins) {
+ for (int i = len - 1; i > -1; i--)
+ str += ins[i];
+ }
+
+public:
+ std::string str;
+ int pos;
+
+ bool hasRemaining() {
+ return pos < str.size();
+ }
+
+ MyStringBuffer() {}
+
+ MyStringBuffer(std::string str) {
+ this->str = str;
+ this->pos = 0;
+ }
+
+ int getInt() {
+ char *data = getchar(4);
+ int ret = *(int *) data;
+ delete[]data;
+ return ret;
+ }
+
+ int64_t getLong() {
+ char *data = getchar(8);
+ int64_t ret = *(int64_t *) data;
+ delete[]data;
+ return ret;
+ }
+
+ float getFloat() {
+ char *data = getchar(4);
+ float ret = *(float *) data;
+ delete[]data;
+ return ret;
+ }
+
+ double getDouble() {
+ char *data = getchar(8);
+ double ret = *(double *) data;
+ delete[]data;
+ return ret;
+ }
+
+ char getChar() {
+ char *data = getchar(1);
+ char ret = *(char *) data;
+ delete[]data;
+ return ret;
+ }
+
+ bool getBool() {
+ return getChar() == 1;
+ }
+
+ std::string getString() {
+ int len = getInt();
+ std::string ret;
+ for (int i = 0; i < len; i++) ret.append(1, getChar());
+ return ret;
+ }
+
+ void putInt(int ins) {
+ char *data = (char *) &ins;
+ putchar(4, data);
+ }
+
+ void putLong(int64_t ins) {
+ char *data = (char *) &ins;
+ putchar(8, data);
+ }
+
+ void putFloat(float ins) {
+ char *data = (char *) &ins;
+ putchar(4, data);
+ }
+
+ void putDouble(double ins) {
+ char *data = (char *) &ins;
+ putchar(8, data);
+ }
+
+ void putChar(char ins) {
+ char *data = (char *) &ins;
+ putchar(1, data);
+ }
+
+ void putBool(bool ins) {
+ char tmp = 0;
+ if (ins) tmp = 1;
+ putChar(tmp);
+ }
+
+ void putString(std::string ins) {
+ int len = ins.size();
+ putInt(len);
+ for (int i = 0; i < len; i++) putChar(ins[i]);
+ }
+};
+
+class Field {
+public:
+ TSDataType::TSDataType dataType;
+ bool boolV;
+ int intV;
+ int64_t longV;
+ float floatV;
+ double doubleV;
+ std::string stringV;
+
+ Field(TSDataType::TSDataType a) {
+ dataType = a;
+ }
+
+ Field() {}
+};
+
+/*
+ * A tablet data of one device, the tablet contains multiple measurements of this device that share
+ * the same time column.
+ *
+ * for example: device root.sg1.d1
+ *
+ * time, m1, m2, m3
+ * 1, 1, 2, 3
+ * 2, 1, 2, 3
+ * 3, 1, 2, 3
+ *
+ * Notice: The tablet should not have empty cell
+ *
+ */
+class Tablet {
+private:
+ static const int DEFAULT_SIZE = 1024;
+public:
+ std::string deviceId; // deviceId of this tablet
+ std::vector <std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
+ std::vector <int64_t> timestamps; //timestamps in this tablet
+ std::vector <std::vector<std::string>> values;
+ int rowSize; //the number of rows to include in this tablet
+ int maxRowNumber; // the maximum number of rows for this tablet
+
+ Tablet() {}
+
+ /**
+ * Return a tablet with default specified row number. This is the standard
+ * constructor (all Tablet should be the same size).
+ *
+ * @param deviceId the name of the device specified to be written in
+ * @param timeseries the list of measurement schemas for creating the tablet
+ */
+ Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> ×eries) {
+ Tablet(deviceId, timeseries, DEFAULT_SIZE);
+ }
+
+ /**
+ * Return a tablet with the specified number of rows (maxBatchSize). Only
+ * call this constructor directly for testing purposes. Tablet should normally
+ * always be default size.
+ *
+ * @param deviceId the name of the device specified to be written in
+ * @param schemas the list of measurement schemas for creating the row
+ * batch
+ * @param maxRowNumber the maximum number of rows for this tablet
+ */
+ Tablet(std::string deviceId, std::vector <std::pair<std::string, TSDataType::TSDataType>> &schemas,
+ int maxRowNumber) {
+ this->deviceId = deviceId;
+ this->schemas = schemas;
+ this->maxRowNumber = maxRowNumber;
+
+ // create timestamp column
+ timestamps.resize(maxRowNumber);
+ // create value columns
+ values.resize(schemas.size());
+ for (int i = 0; i < schemas.size(); i++) {
+ values[i].resize(maxRowNumber);
+ }
+
+ this->rowSize = 0;
+ }
+
+ void reset(); // Reset Tablet to the default state - set the rowSize to 0
+ void createColumns();
+
+ int getTimeBytesSize();
+
+ int getValueByteSize(); // total byte size that values occupies
+};
+
+class SessionUtils {
+public:
+ static std::string getTime(Tablet &tablet);
+
+ static std::string getValue(Tablet &tablet);
+};
+
+class RowRecord {
+public:
+ int64_t timestamp;
+ std::vector <Field> fields;
+
+ RowRecord(int64_t timestamp) {
+ this->timestamp = timestamp;
+ }
+
+ RowRecord(int64_t timestamp, std::vector <Field> &fields) {
+ this->timestamp = timestamp;
+ this->fields = fields;
+ }
+
+ RowRecord() {
+ this->timestamp = -1;
+ }
+
+ void addField(Field &f) {
+ this->fields.push_back(f);
+ }
+
+ std::string toString() {
+ std::string ret = std::to_string(timestamp);
+ for (int i = 0; i < fields.size(); i++) {
+ ret.append("\t");
+ TSDataType::TSDataType dataType = fields[i].dataType;
+ switch (dataType) {
+ case TSDataType::BOOLEAN: {
+ std::string field = fields[i].boolV ? "true" : "false";
+ ret.append(field);
+ break;
+ }
+ case TSDataType::INT32: {
+ ret.append(std::to_string(fields[i].intV));
+ break;
+ }
+ case TSDataType::INT64: {
+ ret.append(std::to_string(fields[i].longV));
+ break;
+ }
+ case TSDataType::FLOAT: {
+ ret.append(std::to_string(fields[i].floatV));
+ break;
+ }
+ case TSDataType::DOUBLE: {
+ ret.append(std::to_string(fields[i].doubleV));
+ break;
+ }
+ case TSDataType::TEXT: {
+ ret.append(fields[i].stringV);
+ break;
+ }
+ case TSDataType::NULLTYPE: {
+ ret.append("NULL");
+ }
+ }
+ }
+ ret.append("\n");
+ return ret;
+ }
+};
+
+class SessionDataSet {
+private:
+ bool hasCachedRecord = false;
+ std::string sql;
+ int64_t queryId;
+ int64_t statementId;
+ int64_t sessionId;
+ std::shared_ptr <TSIServiceIf> client;
+ int batchSize = 1024;
+ std::vector <std::string> columnNameList;
+ std::vector <std::string> columnTypeDeduplicatedList;
+ // duplicated column index -> origin index
+ std::map<int, int> duplicateLocation;
+ // column name -> column location
+ std::map<std::string, int> columnMap;
+ // column size
+ int columnSize = 0;
+
+ int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
+ std::shared_ptr <TSQueryDataSet> tsQueryDataSet;
+ MyStringBuffer tsQueryDataSetTimeBuffer;
+ std::vector <std::unique_ptr<MyStringBuffer>> valueBuffers;
+ std::vector <std::unique_ptr<MyStringBuffer>> bitmapBuffers;
+ RowRecord rowRecord;
+ char *currentBitmap = NULL; // used to cache the current bitmap for every column
+ static const int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
+
+public:
+ SessionDataSet() {}
+
+ SessionDataSet(std::string sql, std::vector <std::string> &columnNameList,
+ std::vector <std::string> &columnTypeList, int64_t queryId, int64_t statementId,
+ std::shared_ptr <TSIServiceIf> client, int64_t sessionId,
+ std::shared_ptr <TSQueryDataSet> queryDataSet) : tsQueryDataSetTimeBuffer(queryDataSet->time) {
+ this->sessionId = sessionId;
+ this->sql = sql;
+ this->queryId = queryId;
+ this->statementId = statementId;
+ this->client = client;
+ this->columnNameList = columnNameList;
+ this->currentBitmap = new char[columnNameList.size()];
+ this->columnSize = columnNameList.size();
+
+ // column name -> column location
+ for (int i = 0; i < columnNameList.size(); i++) {
+ std::string name = columnNameList[i];
+ if (this->columnMap.find(name) != this->columnMap.end()) {
+ duplicateLocation[i] = columnMap[name];
+ } else {
+ this->columnMap[name] = i;
+ this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
+ }
+ this->valueBuffers.push_back(
+ std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[i])));
+ this->bitmapBuffers.push_back(
+ std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[i])));
+ }
+ this->tsQueryDataSet = queryDataSet;
+ }
+
+ ~SessionDataSet() {
+ if (currentBitmap != NULL) {
+ delete[] currentBitmap;
+ currentBitmap = NULL;
+ }
+ }
+
+ int getBatchSize();
+
+ void setBatchSize(int batchSize);
+
+ std::vector <std::string> getColumnNames();
+
+ bool hasNext();
+
+ void constructOneRow();
+
+ bool isNull(int index, int rowNum);
+
+ RowRecord *next();
+
+ void closeOperationHandle();
+};
+
+template<typename T>
+std::vector <T> sortList(std::vector <T> &valueList, int *index, int indexLength) {
+ std::vector <T> sortedValues(valueList.size());
+ for (int i = 0; i < indexLength; i++) {
+ sortedValues[i] = valueList[index[i]];
+ }
+ return sortedValues;
+}
+
+class Session {
+private:
+ std::string host;
+ int rpcPort;
+ std::string username;
+ std::string password;
+ TSProtocolVersion::type protocolVersion = TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
+ std::shared_ptr <TSIServiceIf> client;
+ std::shared_ptr <apache::thrift::transport::TSocket> transport;
+ bool isClosed = true;
+ int64_t sessionId;
+ int64_t statementId;
+ std::string zoneId;
+ int fetchSize;
+ const static int DEFAULT_FETCH_SIZE = 10000;
+ const static int DEFAULT_TIMEOUT_MS = 0;
+
+ bool checkSorted(Tablet &tablet);
+
+ bool checkSorted(std::vector <int64_t> ×);
+
+ void sortTablet(Tablet &tablet);
+
+ void sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps, int length);
+
+ std::string getTimeZone();
+
+ void setTimeZone(std::string zoneId);
+
+ void appendValues(std::string &buffer, char *value, int size);
+
+ void
+ putValuesIntoBuffer(std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values, std::string &buf);
+
+ int8_t getDataTypeNumber(TSDataType::TSDataType type);
+
+public:
+ Session(std::string host, int rpcPort) : username("user"), password("password") {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ }
+
+ Session(std::string host, int rpcPort, std::string username, std::string password)
+ : fetchSize(10000) {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ this->username = username;
+ this->password = password;
+ this->zoneId = "UTC+08:00";
+ }
+
+ Session(std::string host, int rpcPort, std::string username, std::string password, int fetchSize) {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ this->username = username;
+ this->password = password;
+ this->fetchSize = fetchSize;
+ this->zoneId = "UTC+08:00";
+ }
+
+ Session(std::string host, std::string rpcPort, std::string username = "user",
+ std::string password = "password", int fetchSize = 10000) {
+ this->host = host;
+ this->rpcPort = stoi(rpcPort);
+ this->username = username;
+ this->password = password;
+ this->fetchSize = fetchSize;
+ this->zoneId = "UTC+08:00";
+ }
+
+ void open();
+
+ void open(bool enableRPCCompression);
+
+ void open(bool enableRPCCompression, int connectionTimeoutInMs);
+
+ void close();
+
+ void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
+ std::vector <std::string> &values);
+
+ void insertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
+ std::vector <TSDataType::TSDataType> &types, std::vector<char *> &values);
+
+ void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> &measurementsList,
+ std::vector <std::vector<std::string>> &valuesList);
+
+ void insertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> &measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>> typesList,
+ std::vector <std::vector<char *>> &valuesList);
+
+ void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>> typesList,
+ std::vector <std::vector<char *>> &valuesList);
+
+ void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>> typesList,
+ std::vector <std::vector<char *>> &valuesList, bool sorted);
+
+ void insertTablet(Tablet &tablet);
+
+ void insertTablet(Tablet &tablet, bool sorted);
+
+ void insertTablets(std::map<std::string, Tablet *> &tablets);
+
+ void insertTablets(std::map<std::string, Tablet *> &tablets, bool sorted);
+
+ void testInsertRecord(std::string deviceId, int64_t time, std::vector <std::string> &measurements,
+ std::vector <std::string> &values);
+
+ void testInsertTablet(Tablet &tablet);
+
+ void testInsertRecords(std::vector <std::string> &deviceIds, std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> &measurementsList,
+ std::vector <std::vector<std::string>> &valuesList);
+
+ void deleteTimeseries(std::string path);
+
+ void deleteTimeseries(std::vector <std::string> &paths);
+
+ void deleteData(std::string path, int64_t time);
+
+ void deleteData(std::vector <std::string> &deviceId, int64_t time);
+
+ void setStorageGroup(std::string storageGroupId);
+
+ void deleteStorageGroup(std::string storageGroup);
+
+ void deleteStorageGroups(std::vector <std::string> &storageGroups);
+
+ void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor);
+
+ void createTimeseries(std::string path, TSDataType::TSDataType dataType, TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor,
+ std::map <std::string, std::string> *props, std::map <std::string, std::string> *tags,
+ std::map <std::string, std::string> *attributes, std::string measurementAlias);
+
+ void createMultiTimeseries(std::vector <std::string> paths, std::vector <TSDataType::TSDataType> dataTypes,
+ std::vector <TSEncoding::TSEncoding> encodings,
+ std::vector <CompressionType::CompressionType> compressors,
+ std::vector <std::map<std::string, std::string>> *propsList,
+ std::vector <std::map<std::string, std::string>> *tagsList,
+ std::vector <std::map<std::string, std::string>> *attributesList,
+ std::vector <std::string> *measurementAliasList);
+
+ bool checkTimeseriesExists(std::string path);
+
+ std::unique_ptr <SessionDataSet> executeQueryStatement(std::string sql);
+
+ void executeNonQueryStatement(std::string sql);
+};
diff --git a/pom.xml b/pom.xml
index 6f9da49..3e312c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -639,6 +639,7 @@
</importOrder>
<removeUnusedImports/>
</java>
+ <lineEndings>UNIX</lineEndings>
</configuration>
<executions>
<execution>