You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/03/29 02:44:52 UTC
[iotdb] branch master updated: [IOTDB-5714] Fix some IoTDB C++ SDK bugs (#9409)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae4dbb187 [IOTDB-5714] Fix some IoTDB C++ SDK bugs (#9409)
8ae4dbb187 is described below
commit 8ae4dbb187c5f521c404437c9b4319c6e749a0ad
Author: Jamber <ja...@sina.com>
AuthorDate: Wed Mar 29 10:44:46 2023 +0800
[IOTDB-5714] Fix some IoTDB C++ SDK bugs (#9409)
---
client-cpp/src/main/Session.cpp | 486 +++++++++++++++++----
client-cpp/src/main/Session.h | 114 +++--
client-cpp/src/test/cpp/sessionIT.cpp | 220 +++++++++-
.../src/AlignedTimeseriesSessionExample.cpp | 8 +-
example/client-cpp-example/src/SessionExample.cpp | 9 +-
5 files changed, 680 insertions(+), 157 deletions(-)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 29968b26b4..fa9ef10b13 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -20,9 +20,17 @@
#include "Session.h"
#include <algorithm>
#include <memory>
+#include <time.h>
using namespace std;
+/**
+* Timeout of query can be set by users.
+* A negative number means using the default configuration of server.
+* And value 0 will disable the function of query timeout.
+*/
+static const int64_t QUERY_TIMEOUT_MS = -1;
+
LogLevelType LOG_LEVEL = LEVEL_DEBUG;
TSDataType::TSDataType getTSDataTypeFromString(const string &str) {
@@ -44,7 +52,7 @@ void RpcUtils::verifySuccess(const TSStatus &status) {
}
if (status.code != TSStatusCode::SUCCESS_STATUS
&& status.code != TSStatusCode::REDIRECTION_RECOMMEND) {
- throw ExecutionException(to_string(status.code) + ": " + status.message);
+ throw ExecutionException(to_string(status.code) + ": " + status.message, status);
}
}
@@ -382,16 +390,18 @@ string SessionUtils::getValue(const Tablet &tablet) {
return valueBuffer.str;
}
-int SessionDataSet::getBatchSize() {
- return batchSize;
+int SessionDataSet::getFetchSize() {
+ return fetchSize;
}
-void SessionDataSet::setBatchSize(int batchSize) {
- this->batchSize = batchSize;
+void SessionDataSet::setFetchSize(int fetchSize) {
+ this->fetchSize = fetchSize;
}
vector<string> SessionDataSet::getColumnNames() { return this->columnNameList; }
+vector<string> SessionDataSet::getColumnTypeList() { return this->columnTypeList; }
+
bool SessionDataSet::hasNext() {
if (hasCachedRecord) {
return true;
@@ -400,9 +410,10 @@ bool SessionDataSet::hasNext() {
TSFetchResultsReq req;
req.__set_sessionId(sessionId);
req.__set_statement(sql);
- req.__set_fetchSize(batchSize);
+ req.__set_fetchSize(fetchSize);
req.__set_queryId(queryId);
req.__set_isAlign(true);
+ req.__set_timeout(-1);
try {
TSFetchResultsResp resp;
client->fetchResults(resp, req);
@@ -411,24 +422,31 @@ bool SessionDataSet::hasNext() {
if (!resp.hasResultSet) {
return false;
} else {
- tsQueryDataSet = make_shared<TSQueryDataSet>(resp.queryDataSet);
+ TSQueryDataSet *tsQueryDataSet = &(resp.queryDataSet);
tsQueryDataSetTimeBuffer.str = tsQueryDataSet->time;
tsQueryDataSetTimeBuffer.pos = 0;
- for (size_t i = 0; i < columnNameList.size(); i++) {
- valueBuffers.pop_back();
- bitmapBuffers.pop_back();
- }
- for (size_t i = 0; i < columnNameList.size(); i++) {
+
+ valueBuffers.clear();
+ bitmapBuffers.clear();
+
+ for (size_t i = columnFieldStartIndex; i < columnNameList.size(); i++) {
+ if (duplicateLocation.find(i) != duplicateLocation.end()) {
+ continue;
+ }
std::string name = columnNameList[i];
- valueBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(tsQueryDataSet->valueList[columnMap[name]])));
- bitmapBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(tsQueryDataSet->bitmapList[columnMap[name]])));
+ int valueIndex = columnMap[name];
+ valueBuffers.emplace_back(new MyStringBuffer(tsQueryDataSet->valueList[valueIndex]));
+ bitmapBuffers.emplace_back(new MyStringBuffer(tsQueryDataSet->bitmapList[valueIndex]));
}
rowsIndex = 0;
}
- }
- catch (IoTDBException &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (exception &e) {
throw IoTDBException(string("Cannot fetch result from server: ") + e.what());
}
}
@@ -441,7 +459,7 @@ bool SessionDataSet::hasNext() {
void SessionDataSet::constructOneRow() {
vector<Field> outFields;
int loc = 0;
- for (int i = 0; i < columnSize; i++) {
+ for (size_t i = columnFieldStartIndex; i < columnNameList.size(); i++) {
Field field;
if (duplicateLocation.find(i) != duplicateLocation.end()) {
field = outFields[duplicateLocation[i]];
@@ -454,7 +472,7 @@ void SessionDataSet::constructOneRow() {
if (!isNull(loc, rowsIndex)) {
MyStringBuffer *valueBuffer = valueBuffers[loc].get();
- TSDataType::TSDataType dataType = getTSDataTypeFromString(columnTypeDeduplicatedList[loc]);
+ TSDataType::TSDataType dataType = getTSDataTypeFromString(columnTypeList[i]);
field.dataType = dataType;
switch (dataType) {
case TSDataType::BOOLEAN: {
@@ -489,7 +507,7 @@ void SessionDataSet::constructOneRow() {
}
default: {
throw UnSupportedDataTypeException(
- string("Data type ") + columnTypeDeduplicatedList[i] + " is not supported.");
+ string("Data type ") + columnTypeList[i] + " is not supported.");
}
}
} else {
@@ -540,8 +558,13 @@ void SessionDataSet::closeOperationHandle(bool forceClose) {
try {
client->closeOperation(tsStatus, closeReq);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (IoTDBException &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -800,6 +823,24 @@ string Session::getVersionString(Version::Version version) {
}
}
+void Session::initZoneId() {
+ if (!zoneId.empty()) {
+ return;
+ }
+
+ time_t ts = 0;
+ struct tm tmv;
+#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
+ localtime_s(&tmv, &ts);
+#else
+ localtime_r(&ts, &tmv);
+#endif
+
+ char zoneStr[32];
+ strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
+ zoneId = zoneStr;
+}
+
void Session::open() {
open(false, DEFAULT_TIMEOUT_MS);
}
@@ -861,8 +902,15 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
} else {
zoneId = getTimeZone();
}
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ transport->close();
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ transport->close();
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
transport->close();
throw IoTDBException(e.what());
@@ -885,8 +933,10 @@ void Session::close() {
req.__set_sessionId(sessionId);
TSStatus tsStatus;
client->closeSession(tsStatus, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
errMsg = errMsg + "Session::close() client->closeSession() error, maybe remote server is down. " + e.what() + "\n" ;
needThrowException = true;
@@ -923,8 +973,13 @@ void Session::insertRecord(const string &deviceId, int64_t time,
try {
client->insertStringRecord(respStatus, req);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -947,6 +1002,12 @@ void Session::insertRecord(const string &prefixPath, int64_t time,
try {
client->insertRecord(respStatus, req);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -967,8 +1028,13 @@ void Session::insertAlignedRecord(const string &deviceId, int64_t time,
try {
client->insertStringRecord(respStatus, req);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -991,6 +1057,12 @@ void Session::insertAlignedRecord(const string &prefixPath, int64_t time,
try {
client->insertRecord(respStatus, req);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -1018,8 +1090,13 @@ void Session::insertRecords(const vector<string> &deviceIds,
TSStatus respStatus;
client->insertStringRecords(respStatus, request);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1053,6 +1130,12 @@ void Session::insertRecords(const vector<string> &deviceIds,
TSStatus respStatus;
client->insertRecords(respStatus, request);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -1080,8 +1163,13 @@ void Session::insertAlignedRecords(const vector<string> &deviceIds,
TSStatus respStatus;
client->insertStringRecords(respStatus, request);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1115,6 +1203,12 @@ void Session::insertAlignedRecords(const vector<string> &deviceIds,
TSStatus respStatus;
client->insertRecords(respStatus, request);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -1167,6 +1261,12 @@ void Session::insertRecordsOfOneDevice(const string &deviceId,
TSStatus respStatus;
client->insertRecordsOfOneDevice(respStatus, request);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -1219,6 +1319,12 @@ void Session::insertAlignedRecordsOfOneDevice(const string &deviceId,
TSStatus respStatus;
client->insertRecordsOfOneDevice(respStatus, request);
RpcUtils::verifySuccess(respStatus);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
} catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
@@ -1262,8 +1368,13 @@ void Session::insertTablet(const TSInsertTabletReq &request){
TSStatus respStatus;
client->insertTablet(respStatus, request);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1335,8 +1446,13 @@ void Session::insertTablets(unordered_map<string, Tablet *> &tablets, bool sorte
TSStatus respStatus;
client->insertTablets(respStatus, request);
RpcUtils::verifySuccess(respStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1369,8 +1485,13 @@ void Session::testInsertRecord(const string &deviceId, int64_t time, const vecto
try {
client->insertStringRecord(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1392,8 +1513,13 @@ void Session::testInsertTablet(const Tablet &tablet) {
TSStatus tsStatus;
client->testInsertTablet(tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1419,8 +1545,13 @@ void Session::testInsertRecords(const vector<string> &deviceIds,
TSStatus tsStatus;
client->insertStringRecords(tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1438,30 +1569,45 @@ void Session::deleteTimeseries(const vector<string> &paths) {
try {
client->deleteTimeseries(tsStatus, sessionId, paths);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
}
-void Session::deleteData(const string &path, int64_t time) {
+void Session::deleteData(const string &path, int64_t endTime) {
vector<string> paths;
paths.push_back(path);
- deleteData(paths, time);
+ deleteData(paths, LONG_LONG_MIN, endTime);
}
-void Session::deleteData(const vector<string> &deviceId, int64_t time) {
+void Session::deleteData(const vector<string> &paths, int64_t endTime) {
+ deleteData(paths, LONG_LONG_MIN, endTime);
+}
+
+void Session::deleteData(const vector<string> &paths, int64_t startTime, int64_t endTime) {
TSDeleteDataReq req;
req.__set_sessionId(sessionId);
- req.__set_paths(deviceId);
- req.__set_endTime(time);
+ req.__set_paths(paths);
+ req.__set_startTime(startTime);
+ req.__set_endTime(endTime);
TSStatus tsStatus;
try {
client->deleteData(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1472,8 +1618,13 @@ void Session::setStorageGroup(const string &storageGroupId) {
try {
client->setStorageGroup(tsStatus, sessionId, storageGroupId);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1490,8 +1641,13 @@ void Session::deleteStorageGroups(const vector<string> &storageGroups) {
try {
client->deleteStorageGroups(tsStatus, sessionId, storageGroups);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1510,7 +1666,6 @@ void Session::createTimeseries(const string &path,
}
}
-// TODO:
void Session::createTimeseries(const string &path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
@@ -1543,8 +1698,13 @@ void Session::createTimeseries(const string &path,
try {
client->createTimeseries(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1601,8 +1761,13 @@ void Session::createMultiTimeseries(const vector<string> &paths,
TSStatus tsStatus;
client->createMultiTimeseries(tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1643,8 +1808,13 @@ void Session::createAlignedTimeseries(const std::string &deviceId,
TSStatus tsStatus;
client->createAlignedTimeseries(tsStatus, request);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1675,8 +1845,13 @@ string Session::getTimeZone() {
try {
client->getTimeZone(resp, sessionId);
RpcUtils::verifySuccess(resp.status);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1690,27 +1865,42 @@ void Session::setTimeZone(const string &zoneId) {
TSStatus tsStatus;
try {
client->setTimeZone(tsStatus, req);
- }
- catch (const exception &e) {
+ RpcUtils::verifySuccess(tsStatus);
+ this->zoneId = zoneId;
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
- RpcUtils::verifySuccess(tsStatus);
- this->zoneId = zoneId;
}
unique_ptr<SessionDataSet> Session::executeQueryStatement(const string &sql) {
+ return executeQueryStatement(sql, QUERY_TIMEOUT_MS);
+}
+
+unique_ptr<SessionDataSet> Session::executeQueryStatement(const string &sql, int64_t timeoutInMs) {
TSExecuteStatementReq req;
req.__set_sessionId(sessionId);
req.__set_statementId(statementId);
req.__set_statement(sql);
+ req.__set_timeout(timeoutInMs);
req.__set_fetchSize(fetchSize);
TSExecuteStatementResp resp;
try {
client->executeStatement(resp, req);
RpcUtils::verifySuccess(resp.status);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
throw IoTDBException(e.what());
}
shared_ptr<TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp.queryDataSet));
@@ -1724,14 +1914,78 @@ void Session::executeNonQueryStatement(const string &sql) {
req.__set_sessionId(sessionId);
req.__set_statementId(statementId);
req.__set_statement(sql);
+ req.__set_timeout(0); //0 means no timeout. This value keep consistent to JAVA SDK.
TSExecuteStatementResp resp;
try {
client->executeUpdateStatement(resp, req);
RpcUtils::verifySuccess(resp.status);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
+ throw IoTDBException(e.what());
}
- catch (const exception &e) {
+}
+
+unique_ptr<SessionDataSet> Session::executeRawDataQuery(const vector<string> &paths, int64_t startTime, int64_t endTime) {
+ TSRawDataQueryReq req;
+ req.__set_sessionId(sessionId);
+ req.__set_statementId(statementId);
+ req.__set_fetchSize(fetchSize);
+ req.__set_paths(paths);
+ req.__set_startTime(startTime);
+ req.__set_endTime(endTime);
+ TSExecuteStatementResp resp;
+ try {
+ client->executeRawDataQuery(resp, req);
+ RpcUtils::verifySuccess(resp.status);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
+ throw IoTDBException(e.what());
+ }
+ shared_ptr<TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp.queryDataSet));
+ return unique_ptr<SessionDataSet>(
+ new SessionDataSet("", resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.ignoreTimeStamp,
+ resp.queryId, statementId, client, sessionId, queryDataSet));
+}
+
+
+unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string> &paths) {
+ return executeLastDataQuery(paths, LONG_LONG_MIN);
+}
+unique_ptr<SessionDataSet> Session::executeLastDataQuery(const vector<string> &paths, int64_t lastTime) {
+ TSLastDataQueryReq req;
+ req.__set_sessionId(sessionId);
+ req.__set_statementId(statementId);
+ req.__set_fetchSize(fetchSize);
+ req.__set_paths(paths);
+ req.__set_time(lastTime);
+
+ TSExecuteStatementResp resp;
+ try {
+ client->executeLastDataQuery(resp, req);
+ RpcUtils::verifySuccess(resp.status);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
throw IoTDBException(e.what());
}
+ shared_ptr<TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp.queryDataSet));
+ return unique_ptr<SessionDataSet>(
+ new SessionDataSet("", resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.ignoreTimeStamp,
+ resp.queryId, statementId, client, sessionId, queryDataSet));
}
void Session::createSchemaTemplate(const Template &templ) {
@@ -1743,8 +1997,13 @@ void Session::createSchemaTemplate(const Template &templ) {
try {
client->createSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1759,8 +2018,13 @@ void Session::setSchemaTemplate(const string &template_name, const string &prefi
try {
client->setSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1775,8 +2039,13 @@ void Session::unsetSchemaTemplate(const string &prefix_path, const string &templ
try {
client->unsetSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1817,8 +2086,13 @@ void Session::addAlignedMeasurementsInTemplate(const string &template_name, cons
try {
client->appendSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1869,8 +2143,13 @@ void Session::addUnalignedMeasurementsInTemplate(const string &template_name, co
try {
client->appendSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1895,8 +2174,13 @@ void Session::deleteNodeInTemplate(const string &template_name, const string &pa
try {
client->pruneSchemaTemplate(tsStatus, req);
RpcUtils::verifySuccess(tsStatus);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ throw;
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1910,8 +2194,10 @@ int Session::countMeasurementsInTemplate(const string &template_name) {
TSQueryTemplateResp resp;
try {
client->querySchemaTemplate(resp, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1927,8 +2213,10 @@ bool Session::isMeasurementInTemplate(const string &template_name, const string
TSQueryTemplateResp resp;
try {
client->querySchemaTemplate(resp, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1944,8 +2232,10 @@ bool Session::isPathExistInTemplate(const string &template_name, const string &p
TSQueryTemplateResp resp;
try {
client->querySchemaTemplate(resp, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1961,8 +2251,10 @@ std::vector<std::string> Session::showMeasurementsInTemplate(const string &templ
TSQueryTemplateResp resp;
try {
client->querySchemaTemplate(resp, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
@@ -1978,8 +2270,10 @@ std::vector<std::string> Session::showMeasurementsInTemplate(const string &templ
TSQueryTemplateResp resp;
try {
client->querySchemaTemplate(resp, req);
- }
- catch (const exception &e) {
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
log_debug(e.what());
throw IoTDBException(e.what());
}
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index c852da40f5..b3a9b9a04c 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -42,6 +42,11 @@
#include <thrift/transport/TBufferTransports.h>
#include "IClientRPCService.h"
+//== For compatible with Windows OS ==
+#ifndef LONG_LONG_MIN
+#define LONG_LONG_MIN 0x8000000000000000
+#endif
+
using namespace std;
using ::apache::thrift::protocol::TBinaryProtocol;
@@ -99,6 +104,10 @@ public:
explicit ExecutionException(const char *m) : IoTDBException(m) {}
explicit ExecutionException(const std::string &m) : IoTDBException(m) {}
+
+ explicit ExecutionException(const std::string &m, const TSStatus &tsStatus) : IoTDBException(m), status(tsStatus) {}
+
+ TSStatus status;
};
class BatchExecutionException : public IoTDBException {
@@ -691,21 +700,23 @@ public:
class SessionDataSet {
private:
+ const string TIMESTAMP_STR = "Time";
bool hasCachedRecord = false;
std::string sql;
int64_t queryId;
int64_t statementId;
int64_t sessionId;
std::shared_ptr<IClientRPCServiceIf> client;
- int batchSize = 1024;
+ int fetchSize = 1024;
std::vector<std::string> columnNameList;
- std::vector<std::string> columnTypeDeduplicatedList;
+ std::vector<std::string> columnTypeList;
// duplicated column index -> origin index
std::unordered_map<int, int> duplicateLocation;
// column name -> column location
std::unordered_map<std::string, int> columnMap;
// column size
int columnSize = 0;
+ int columnFieldStartIndex = 0; //Except Timestamp column, 1st field's pos in columnNameList
bool isIgnoreTimeStamp = false;
int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
@@ -733,32 +744,37 @@ public:
this->queryId = queryId;
this->statementId = statementId;
this->client = client;
- this->columnNameList = columnNameList;
this->currentBitmap = new char[columnNameList.size()];
- this->columnSize = (int)columnNameList.size();
this->isIgnoreTimeStamp = isIgnoreTimeStamp;
-
- // column name -> column location
- for (int i = 0; i < (int) columnNameList.size(); i++) {
- std::string name = columnNameList[i];
+ if (!isIgnoreTimeStamp) {
+ columnFieldStartIndex = 1;
+ this->columnNameList.push_back(TIMESTAMP_STR);
+ this->columnTypeList.push_back("INT64");
+ }
+ this->columnNameList.insert(this->columnNameList.end(), columnNameList.begin(), columnNameList.end());
+ this->columnTypeList.insert(this->columnTypeList.end(), columnTypeList.begin(), columnTypeList.end());
+
+ valueBuffers.reserve(queryDataSet->valueList.size());
+ bitmapBuffers.reserve(queryDataSet->bitmapList.size());
+ int deduplicateIdx = 0;
+ std::unordered_map<std::string, int> columnToFirstIndexMap;
+ for (size_t i = columnFieldStartIndex; i < this->columnNameList.size(); i++) {
+ std::string name = this->columnNameList[i];
if (this->columnMap.find(name) != this->columnMap.end()) {
- duplicateLocation[i] = columnMap[name];
+ duplicateLocation[i] = columnToFirstIndexMap[name];
} else {
- this->columnMap[name] = i;
- this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
- }
- if (!columnNameIndexMap.empty()) {
- this->valueBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(
- new MyStringBuffer(queryDataSet->valueList[columnNameIndexMap[name]])));
- this->bitmapBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(
- new MyStringBuffer(queryDataSet->bitmapList[columnNameIndexMap[name]])));
- } else {
- this->valueBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[columnMap[name]])));
- this->bitmapBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[columnMap[name]])));
+ columnToFirstIndexMap[name] = i;
+ if (!columnNameIndexMap.empty()) {
+ int valueIndex = columnNameIndexMap[name];
+ this->columnMap[name] = valueIndex;
+ this->valueBuffers.emplace_back(new MyStringBuffer(queryDataSet->valueList[valueIndex]));
+ this->bitmapBuffers.emplace_back(new MyStringBuffer(queryDataSet->bitmapList[valueIndex]));
+ } else {
+ this->columnMap[name] = deduplicateIdx;
+ this->valueBuffers.emplace_back(new MyStringBuffer(queryDataSet->valueList[deduplicateIdx]));
+ this->bitmapBuffers.emplace_back(new MyStringBuffer(queryDataSet->bitmapList[deduplicateIdx]));
+ }
+ deduplicateIdx++;
}
}
this->tsQueryDataSet = queryDataSet;
@@ -779,12 +795,14 @@ public:
}
}
- int getBatchSize();
+ int getFetchSize();
- void setBatchSize(int batchSize);
+ void setFetchSize(int fetchSize);
std::vector<std::string> getColumnNames();
+ std::vector<std::string> getColumnTypeList();
+
bool hasNext();
void constructOneRow();
@@ -955,6 +973,7 @@ private:
const static int DEFAULT_TIMEOUT_MS = 0;
Version::Version version;
+private:
static bool checkSorted(const Tablet &tablet);
static bool checkSorted(const std::vector<int64_t> ×);
@@ -963,10 +982,6 @@ private:
static void sortIndexByTimestamp(int *index, std::vector<int64_t> ×tamps, int length);
- std::string getTimeZone();
-
- void setTimeZone(const std::string &zoneId);
-
void appendValues(std::string &buffer, const char *value, int size);
void
@@ -985,42 +1000,47 @@ private:
std::string getVersionString(Version::Version version);
+ void initZoneId();
+
public:
Session(const std::string &host, int rpcPort) : username("user"), password("password"), version(Version::V_1_0) {
this->host = host;
this->rpcPort = rpcPort;
+ initZoneId();
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password)
- : fetchSize(10000) {
+ : fetchSize(DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
- this->zoneId = "UTC+08:00";
this->version = Version::V_1_0;
+ initZoneId();
}
Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password,
- int fetchSize) {
+ const std::string &zoneId, int fetchSize = DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
this->password = password;
+ this->zoneId = zoneId;
this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
this->version = Version::V_1_0;
+ initZoneId();
}
Session(const std::string &host, const std::string &rpcPort, const std::string &username = "user",
- const std::string &password = "password", int fetchSize = 10000) {
+ const std::string &password = "password", const std::string &zoneId="", int fetchSize = DEFAULT_FETCH_SIZE) {
this->host = host;
this->rpcPort = stoi(rpcPort);
this->username = username;
this->password = password;
+ this->zoneId = zoneId;
this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
this->version = Version::V_1_0;
+ initZoneId();
}
~Session();
@@ -1035,6 +1055,10 @@ public:
void close();
+ void setTimeZone(const std::string &zoneId);
+
+ std::string getTimeZone();
+
void insertRecord(const std::string &deviceId, int64_t time, const std::vector<std::string> &measurements,
const std::vector<std::string> &values);
@@ -1128,9 +1152,11 @@ public:
void deleteTimeseries(const std::vector<std::string> &paths);
- void deleteData(const std::string &path, int64_t time);
+ void deleteData(const std::string &path, int64_t endTime);
+
+ void deleteData(const std::vector<std::string> &paths, int64_t endTime);
- void deleteData(const std::vector<std::string> &deviceId, int64_t time);
+ void deleteData(const std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
void setStorageGroup(const std::string &storageGroupId);
@@ -1164,10 +1190,18 @@ public:
bool checkTimeseriesExists(const std::string &path);
- std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql);
+ std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql) ;
+
+ std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string &sql, int64_t timeoutInMs) ;
void executeNonQueryStatement(const std::string &sql);
+ std::unique_ptr<SessionDataSet> executeRawDataQuery(const std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
+
+ std::unique_ptr<SessionDataSet> executeLastDataQuery(const std::vector<std::string> &paths);
+
+ std::unique_ptr<SessionDataSet> executeLastDataQuery(const std::vector<std::string> &paths, int64_t lastTime);
+
void createSchemaTemplate(const Template &templ);
void setSchemaTemplate(const std::string &template_name, const std::string &prefix_path);
@@ -1213,4 +1247,4 @@ public:
bool checkTemplateExists(const std::string &template_name);
};
-#endif // IOTDB_SESSION_H
\ No newline at end of file
+#endif // IOTDB_SESSION_H
diff --git a/client-cpp/src/test/cpp/sessionIT.cpp b/client-cpp/src/test/cpp/sessionIT.cpp
index b4330b82e8..8016a4e37b 100644
--- a/client-cpp/src/test/cpp/sessionIT.cpp
+++ b/client-cpp/src/test/cpp/sessionIT.cpp
@@ -86,7 +86,7 @@ TEST_CASE("Test insertRecord by string", "[testInsertRecord]") {
session->executeNonQueryStatement("insert into root.test.d1(timestamp,s1, s2, s3) values(100, 1,2,3)");
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
int count = 0;
while (sessionDataSet->hasNext()) {
long index = 1;
@@ -131,7 +131,7 @@ TEST_CASE("Test insertRecords ", "[testInsertRecords]") {
}
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
int count = 0;
while (sessionDataSet->hasNext()) {
long index = 1;
@@ -149,7 +149,7 @@ TEST_CASE("Test insertRecord with types ", "[testTypedInsertRecord]") {
vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"};
vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64};
- for (int i = 0; i < timeseries.size(); i++) {
+ for (size_t i = 0; i < timeseries.size(); i++) {
if (session->checkTimeseriesExists(timeseries[i])) {
session->deleteTimeseries(timeseries[i]);
}
@@ -167,7 +167,7 @@ TEST_CASE("Test insertRecord with types ", "[testTypedInsertRecord]") {
}
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
long count = 0;
while (sessionDataSet->hasNext()) {
sessionDataSet->next();
@@ -181,7 +181,7 @@ TEST_CASE("Test insertRecords with types ", "[testTypedInsertRecords]") {
vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"};
vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64};
- for (int i = 0; i < timeseries.size(); i++) {
+ for (size_t i = 0; i < timeseries.size(); i++) {
if (session->checkTimeseriesExists(timeseries[i])) {
session->deleteTimeseries(timeseries[i]);
}
@@ -210,7 +210,7 @@ TEST_CASE("Test insertRecords with types ", "[testTypedInsertRecords]") {
session->insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
int count = 0;
while (sessionDataSet->hasNext()) {
sessionDataSet->next();
@@ -224,7 +224,7 @@ TEST_CASE("Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") {
vector<string> timeseries = {"root.test.d1.s1", "root.test.d1.s2", "root.test.d1.s3"};
vector<TSDataType::TSDataType> types = {TSDataType::INT32, TSDataType::DOUBLE, TSDataType::INT64};
- for (int i = 0; i < timeseries.size(); i++) {
+ for (size_t i = 0; i < timeseries.size(); i++) {
if (session->checkTimeseriesExists(timeseries[i])) {
session->deleteTimeseries(timeseries[i]);
}
@@ -251,7 +251,7 @@ TEST_CASE("Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") {
session->insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, typesList, valuesList);
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select * from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
int count = 0;
while (sessionDataSet->hasNext()) {
sessionDataSet->next();
@@ -287,7 +287,7 @@ TEST_CASE("Test insertTablet ", "[testInsertTablet]") {
tablet.reset();
}
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
int count = 0;
while (sessionDataSet->hasNext()) {
long index = 0;
@@ -314,7 +314,7 @@ TEST_CASE("Test Last query ", "[testLastQuery]") {
vector<string> measurementValues = {"1", "2", "3"};
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement(
"select last s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
long index = 0;
while (sessionDataSet->hasNext()) {
vector<Field> fields = sessionDataSet->next()->fields;
@@ -348,7 +348,7 @@ TEST_CASE("Test Huge query ", "[testHugeQuery]") {
}
unique_ptr<SessionDataSet> sessionDataSet = session->executeQueryStatement("select s1,s2,s3 from root.test.d1");
- sessionDataSet->setBatchSize(1024);
+ sessionDataSet->setFetchSize(1024);
RowRecord* rowRecord;
int count = 0;
print_count = 0;
@@ -369,4 +369,200 @@ TEST_CASE("Test Huge query ", "[testHugeQuery]") {
}
REQUIRE(count == total_count);
-}
\ No newline at end of file
+}
+
+
+TEST_CASE("Test executeRawDataQuery ", "[executeRawDataQuery]") {
+ CaseReporter cr("executeRawDataQuery");
+ prepareTimeseries();
+
+ string deviceId = "root.test.d1";
+ vector<string> measurements = {"s1", "s2", "s3"};
+ vector<TSDataType::TSDataType> types = {TSDataType::INT64, TSDataType::INT64, TSDataType::INT64};
+
+ long total_count = 5000;
+ vector<char*> values;
+ int64_t valueArray[3];
+ for (long time = -total_count; time < total_count; time++) {
+ valueArray[0] = time;
+ valueArray[1] = time * 2;
+ valueArray[2] = time * 3;
+ values.clear();
+ values.push_back((char*)&valueArray[0]);
+ values.push_back((char*)&valueArray[1]);
+ values.push_back((char*)&valueArray[2]);
+ session->insertRecord(deviceId, time, measurements, types, values);
+ if (time == 100) { //insert 1 big timestamp data for generate un-seq data.
+ valueArray[0] = 9;
+ valueArray[2] = 999;
+ values.clear();
+ values.push_back((char*)&valueArray[0]);
+ values.push_back((char*)&valueArray[2]);
+ vector<string> measurements2 = {"s1", "s3"};
+ vector<TSDataType::TSDataType> types2 = {TSDataType::INT64, TSDataType::INT64};
+ session->insertRecord(deviceId, 99999, measurements2, types2, values);
+ }
+ }
+
+ vector<string> paths;
+ paths.push_back("root.test.d1.s1");
+ paths.push_back("root.test.d1.s2");
+ paths.push_back("root.test.d1.s3");
+
+ //== Test executeRawDataQuery() with negative timestamp
+ int startTs = -total_count, endTs = total_count;
+ unique_ptr<SessionDataSet> sessionDataSet = session->executeRawDataQuery(paths, startTs, endTs);
+ sessionDataSet->setFetchSize(10);
+ vector<string> columns = sessionDataSet->getColumnNames();
+ columns = sessionDataSet->getColumnNames();
+ for (const string &column : columns) {
+ cout << column << " " ;
+ }
+ cout << endl;
+ REQUIRE(columns[0] == "Time");
+ REQUIRE(columns[1] == paths[0]);
+ REQUIRE(columns[2] == paths[1]);
+ REQUIRE(columns[3] == paths[2]);
+
+ int ts = startTs;
+ while (sessionDataSet->hasNext()) {
+ RowRecord *rowRecordPtr = sessionDataSet->next();
+ //cout << rowRecordPtr->toString();
+
+ vector<Field> fields = rowRecordPtr->fields;
+ REQUIRE(rowRecordPtr->timestamp == ts);
+ REQUIRE(fields[0].dataType == TSDataType::INT64);
+ REQUIRE(fields[0].longV == ts);
+ REQUIRE(fields[1].dataType == TSDataType::INT64);
+ REQUIRE(fields[1].longV == ts * 2);
+ REQUIRE(fields[2].dataType == TSDataType::INT64);
+ REQUIRE(fields[2].longV == ts *3);
+ ts++;
+ }
+
+
+ //== Test executeRawDataQuery() with null field
+ startTs = 99999;
+ endTs = 99999 + 10;
+ sessionDataSet = session->executeRawDataQuery(paths, startTs, endTs);
+
+ sessionDataSet->setFetchSize(10);
+ columns = sessionDataSet->getColumnNames();
+ for (const string &column : columns) {
+ cout << column << " " ;
+ }
+ cout << endl;
+ REQUIRE(columns[0] == "Time");
+ REQUIRE(columns[1] == paths[0]);
+ REQUIRE(columns[2] == paths[1]);
+ REQUIRE(columns[3] == paths[2]);
+ ts = startTs;
+ while (sessionDataSet->hasNext()) {
+ RowRecord *rowRecordPtr = sessionDataSet->next();
+ cout << rowRecordPtr->toString();
+
+ vector<Field> fields = rowRecordPtr->fields;
+ REQUIRE(rowRecordPtr->timestamp == ts);
+ REQUIRE(fields[0].dataType == TSDataType::INT64);
+ REQUIRE(fields[0].longV == 9);
+ REQUIRE(fields[1].dataType == TSDataType::NULLTYPE);
+ REQUIRE(fields[2].dataType == TSDataType::INT64);
+ REQUIRE(fields[2].longV == 999);
+ }
+
+ //== Test executeRawDataQuery() with empty data
+ sessionDataSet = session->executeRawDataQuery(paths, 100000, 110000);
+ sessionDataSet->setFetchSize(1);
+ REQUIRE(sessionDataSet->hasNext() == false);
+}
+
+TEST_CASE("Test executeLastDataQuery ", "[testExecuteLastDataQuery]") {
+ CaseReporter cr("testExecuteLastDataQuery");
+ prepareTimeseries();
+
+ string deviceId = "root.test.d1";
+ vector<string> measurements = {"s1", "s2", "s3"};
+ vector<TSDataType::TSDataType> types = {TSDataType::INT64, TSDataType::INT64, TSDataType::INT64};
+
+ long total_count = 5000;
+ vector<char*> values;
+ int64_t valueArray[3];
+ for (long time = -total_count; time < total_count; time++) {
+ valueArray[0] = time;
+ valueArray[1] = time * 2;
+ valueArray[2] = time * 3;
+ values.clear();
+ values.push_back((char*)&valueArray[0]);
+ values.push_back((char*)&valueArray[1]);
+ values.push_back((char*)&valueArray[2]);
+ session->insertRecord(deviceId, time, measurements, types, values);
+ if (time == 100) { //insert 1 big timestamp data for gen unseq data.
+ valueArray[0] = 9;
+ valueArray[2] = 999;
+ values.clear();
+ values.push_back((char*)&valueArray[0]);
+ values.push_back((char*)&valueArray[2]);
+ vector<string> measurements2 = {"s1", "s3"};
+ vector<TSDataType::TSDataType> types2 = {TSDataType::INT64, TSDataType::INT64};
+ session->insertRecord(deviceId, 99999, measurements2, types2, values);
+ }
+ }
+
+ int64_t tsCheck[3] = {99999, 4999, 99999};
+ std::vector<std::string> valueCheck = {"9", "9998", "999"};
+
+ vector<string> paths;
+ paths.push_back("root.test.d1.s1");
+ paths.push_back("root.test.d1.s2");
+ paths.push_back("root.test.d1.s3");
+
+ //== Test executeLastDataQuery() without lastTime
+ unique_ptr<SessionDataSet> sessionDataSet = session->executeLastDataQuery(paths);
+ sessionDataSet->setFetchSize(1);
+
+ vector<string> columns = sessionDataSet->getColumnNames();
+ for (const string &column : columns) {
+ cout << column << " " ;
+ }
+ cout << endl;
+
+ int index = 0;
+ while (sessionDataSet->hasNext()) {
+ RowRecord *rowRecordPtr = sessionDataSet->next();
+ cout << rowRecordPtr->toString();
+
+ vector<Field> fields = rowRecordPtr->fields;
+ REQUIRE(rowRecordPtr->timestamp == tsCheck[index]);
+ REQUIRE(fields[0].stringV == paths[index]);
+ REQUIRE(fields[1].stringV == valueCheck[index]);
+ REQUIRE(fields[2].stringV == "INT64");
+ index++;
+ }
+
+ //== Test executeLastDataQuery() with negative lastTime
+ sessionDataSet = session->executeLastDataQuery(paths, -200);
+ sessionDataSet->setFetchSize(1);
+ columns = sessionDataSet->getColumnNames();
+ for (const string &column : columns) {
+ cout << column << " " ;
+ }
+ cout << endl;
+
+ index = 0;
+ while (sessionDataSet->hasNext()) {
+ RowRecord *rowRecordPtr = sessionDataSet->next();
+ cout << rowRecordPtr->toString();
+
+ vector<Field> fields = rowRecordPtr->fields;
+ REQUIRE(rowRecordPtr->timestamp == tsCheck[index]);
+ REQUIRE(fields[0].stringV == paths[index]);
+ REQUIRE(fields[1].stringV == valueCheck[index]);
+ REQUIRE(fields[2].stringV == "INT64");
+ index++;
+ }
+
+ //== Test executeLastDataQuery() with the lastTime that is > largest timestamp.
+ sessionDataSet = session->executeLastDataQuery(paths, 100000);
+ sessionDataSet->setFetchSize(1024);
+ REQUIRE(sessionDataSet->hasNext() == false);
+}
diff --git a/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp b/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
index e5c1da1068..8f175cb765 100644
--- a/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
+++ b/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
@@ -75,7 +75,7 @@ void showDevices() {
}
cout << endl;
- dataSet->setBatchSize(1024);
+ dataSet->setFetchSize(1024);
while (dataSet->hasNext()) {
cout << dataSet->next()->toString();
}
@@ -91,7 +91,7 @@ void showTimeseries() {
}
cout << endl;
- dataSet->setBatchSize(1024);
+ dataSet->setFetchSize(1024);
while (dataSet->hasNext()) {
cout << dataSet->next()->toString();
}
@@ -298,7 +298,7 @@ void query() {
}
cout << endl;
- dataSet->setBatchSize(1024);
+ dataSet->setFetchSize(1024);
while (dataSet->hasNext()) {
cout << dataSet->next()->toString();
}
@@ -352,7 +352,7 @@ int main() {
string errorMessage(e.what());
if (errorMessage.find("StorageGroupAlreadySetException") == string::npos) {
cout << errorMessage << endl;
- throw e;
+ //throw e;
}
}
diff --git a/example/client-cpp-example/src/SessionExample.cpp b/example/client-cpp-example/src/SessionExample.cpp
index e7a3c46b2b..c3f5602cb0 100644
--- a/example/client-cpp-example/src/SessionExample.cpp
+++ b/example/client-cpp-example/src/SessionExample.cpp
@@ -119,7 +119,7 @@ void showTimeseries() {
}
cout << endl;
- dataSet->setBatchSize(1024);
+ dataSet->setFetchSize(1024);
while (dataSet->hasNext()) {
cout << dataSet->next()->toString();
}
@@ -316,13 +316,12 @@ void nonQuery() {
void query() {
unique_ptr<SessionDataSet> dataSet = session->executeQueryStatement("select s1, s2, s3 from root.**");
- cout << "timestamp" << " ";
for (const string &name: dataSet->getColumnNames()) {
cout << name << " ";
}
cout << endl;
- dataSet->setBatchSize(1024);
+ dataSet->setFetchSize(1024);
while (dataSet->hasNext()) {
cout << dataSet->next()->toString();
}
@@ -389,7 +388,7 @@ int main() {
if (errorMessage.find("StorageGroupAlreadySetException") == string::npos) {
cout << errorMessage << endl;
}
- throw e;
+ //throw e;
}
cout << "setStorageGroup: root.sg2\n" << endl;
@@ -401,7 +400,7 @@ int main() {
if (errorMessage.find("StorageGroupAlreadySetException") == string::npos) {
cout << errorMessage << endl;
}
- throw e;
+ //throw e;
}
cout << "createTimeseries\n" << endl;