You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/11 05:43:18 UTC
[iotdb] branch master updated: [IOTDB-1803] Support insert Tablet with null value in c++ client (#4755)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e6781b0 [IOTDB-1803] Support insert Tablet with null value in c++ client (#4755)
e6781b0 is described below
commit e6781b07bd6caf9b534e6a37532f8b5eef27a82a
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Jan 11 13:42:40 2022 +0800
[IOTDB-1803] Support insert Tablet with null value in c++ client (#4755)
---
client-cpp/src/main/Session.cpp | 313 ++++++++++++---------
client-cpp/src/main/Session.h | 198 +++++++++----
.../src/AlignedTimeseriesSessionExample.cpp | 45 ++-
example/client-cpp-example/src/SessionExample.cpp | 48 +++-
.../apache/iotdb/session/util/SessionUtils.java | 15 +-
5 files changed, 417 insertions(+), 202 deletions(-)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 2f4468d..374284b 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -44,7 +44,7 @@ void RpcUtils::verifySuccess(const TSStatus &status) {
}
}
-void RpcUtils::verifySuccess(const vector <TSStatus> &statuses) {
+void RpcUtils::verifySuccess(const vector<TSStatus> &statuses) {
for (TSStatus status: statuses) {
if (status.code != TSStatusCode::SUCCESS_STATUS) {
throw BatchExecutionException(statuses, status.message);
@@ -65,37 +65,37 @@ TSStatus RpcUtils::getStatus(int code, const string &message) {
return status;
}
-shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
+shared_ptr<TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSExecuteStatementResp(status);
}
-shared_ptr <TSExecuteStatementResp>
+shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, const string &message) {
TSStatus status = getStatus(tsStatusCode, message);
return getTSExecuteStatementResp(status);
}
-shared_ptr <TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(const TSStatus &status) {
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+shared_ptr<TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(const TSStatus &status) {
+ shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
TSStatus tsStatus(status);
resp->status = status;
return resp;
}
-shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
+shared_ptr<TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSFetchResultsResp(status);
}
-shared_ptr <TSFetchResultsResp>
+shared_ptr<TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const string &appendMessage) {
TSStatus status = getStatus(tsStatusCode, appendMessage);
return getTSFetchResultsResp(status);
}
-shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(const TSStatus &status) {
- shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
+shared_ptr<TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(const TSStatus &status) {
+ shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
TSStatus tsStatus(status);
resp->__set_status(tsStatus);
return resp;
@@ -103,6 +103,10 @@ shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(const TSStatus &
void Tablet::reset() {
rowSize = 0;
+ for (int i = 0; i < schemas.size(); i++) {
+ BitMap *bitMap = bitMaps[i].get();
+ bitMap->reset();
+ }
}
void Tablet::createColumns() {
@@ -168,30 +172,51 @@ string SessionUtils::getValue(const Tablet &tablet) {
MyStringBuffer valueBuffer;
for (size_t i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
+ BitMap *bitMap = tablet.bitMaps[i].get();
switch (dataType) {
case TSDataType::BOOLEAN:
for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putBool(tablet.values[i][index] == "true");
+ if (!bitMap->isMarked(index)) {
+ valueBuffer.putBool(tablet.values[i][index] == "true");
+ } else {
+ valueBuffer.putBool(false);
+ }
}
break;
case TSDataType::INT32:
for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putInt(stoi(tablet.values[i][index]));
+ if (!bitMap->isMarked(index)) {
+ valueBuffer.putInt(stoi(tablet.values[i][index]));
+ } else {
+ valueBuffer.putInt((numeric_limits<int>::min)());
+ }
}
break;
case TSDataType::INT64:
for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putLong(stol(tablet.values[i][index]));
+ if (!bitMap->isMarked(index)) {
+ valueBuffer.putLong(stol(tablet.values[i][index]));
+ } else {
+ valueBuffer.putLong((numeric_limits<int64_t>::min)());
+ }
}
break;
case TSDataType::FLOAT:
for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putFloat(stof(tablet.values[i][index]));
+ if (!bitMap->isMarked(index)) {
+ valueBuffer.putFloat(stof(tablet.values[i][index]));
+ } else {
+ valueBuffer.putFloat((numeric_limits<float>::min)());
+ }
}
break;
case TSDataType::DOUBLE:
for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putDouble(stod(tablet.values[i][index]));
+ if (!bitMap->isMarked(index)) {
+ valueBuffer.putDouble(stod(tablet.values[i][index]));
+ } else {
+ valueBuffer.putDouble((numeric_limits<double>::min)());
+ }
}
break;
case TSDataType::TEXT:
@@ -203,6 +228,17 @@ string SessionUtils::getValue(const Tablet &tablet) {
throw UnSupportedDataTypeException(string("Data type ") + to_string(dataType) + " is not supported.");
}
}
+ for (size_t i = 0; i < tablet.schemas.size(); i++) {
+ BitMap *bitMap = tablet.bitMaps[i].get();
+ bool columnHasNull = !bitMap->isAllUnmarked();
+ valueBuffer.putChar(columnHasNull ? (char) 1 : (char) 0);
+ if (columnHasNull) {
+ vector<char> bytes = bitMap->getByteArray();
+ for (int j = 0; j < bytes.size(); j++) {
+ valueBuffer.putChar(bytes[j]);
+ }
+ }
+ }
return valueBuffer.str;
}
@@ -214,21 +250,21 @@ void SessionDataSet::setBatchSize(int batchSize) {
this->batchSize = batchSize;
}
-vector <string> SessionDataSet::getColumnNames() { return this->columnNameList; }
+vector<string> SessionDataSet::getColumnNames() { return this->columnNameList; }
bool SessionDataSet::hasNext() {
if (hasCachedRecord) {
return true;
}
if (!tsQueryDataSetTimeBuffer.hasRemaining()) {
- shared_ptr <TSFetchResultsReq> req(new TSFetchResultsReq());
+ shared_ptr<TSFetchResultsReq> req(new TSFetchResultsReq());
req->__set_sessionId(sessionId);
req->__set_statement(sql);
req->__set_fetchSize(batchSize);
req->__set_queryId(queryId);
req->__set_isAlign(true);
try {
- shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
+ shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
client->fetchResults(*resp, *req);
RpcUtils::verifySuccess(resp->status);
@@ -252,7 +288,7 @@ bool SessionDataSet::hasNext() {
}
void SessionDataSet::constructOneRow() {
- vector <Field> outFields;
+ vector<Field> outFields;
int loc = 0;
for (int i = 0; i < columnSize; i++) {
Field field;
@@ -313,7 +349,7 @@ void SessionDataSet::constructOneRow() {
outFields.push_back(field);
}
- if(!this->isIgnoreTimeStamp) {
+ if (!this->isIgnoreTimeStamp) {
rowRecord = RowRecord(tsQueryDataSetTimeBuffer.getLong(), outFields);
} else {
tsQueryDataSetTimeBuffer.getLong();
@@ -340,11 +376,11 @@ RowRecord *SessionDataSet::next() {
}
void SessionDataSet::closeOperationHandle() {
- shared_ptr <TSCloseOperationReq> closeReq(new TSCloseOperationReq());
+ shared_ptr<TSCloseOperationReq> closeReq(new TSCloseOperationReq());
closeReq->__set_sessionId(sessionId);
closeReq->__set_statementId(statementId);
closeReq->__set_queryId(queryId);
- shared_ptr <TSStatus> closeResp(new TSStatus());
+ shared_ptr<TSStatus> closeResp(new TSStatus());
try {
client->closeOperation(*closeResp, *closeReq);
RpcUtils::verifySuccess(*closeResp);
@@ -376,7 +412,7 @@ bool Session::checkSorted(const Tablet &tablet) {
return true;
}
-bool Session::checkSorted(const vector <int64_t> ×) {
+bool Session::checkSorted(const vector<int64_t> ×) {
for (size_t i = 1; i < times.size(); i++) {
if (times[i] < times[i - 1]) {
return false;
@@ -405,8 +441,8 @@ void Session::sortTablet(Tablet &tablet) {
delete[] index;
}
-void Session::sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps, int length) {
- if ( length <= 1 ) {
+void Session::sortIndexByTimestamp(int *index, std::vector<int64_t> ×tamps, int length) {
+ if (length <= 1) {
return;
}
@@ -436,7 +472,7 @@ void Session::appendValues(string &buffer, const char *value, int size) {
}
void
-Session::putValuesIntoBuffer(const vector <TSDataType::TSDataType> &types, const vector<char *> &values, string &buf) {
+Session::putValuesIntoBuffer(const vector<TSDataType::TSDataType> &types, const vector<char *> &values, string &buf) {
for (size_t i = 0; i < values.size(); i++) {
int8_t typeNum = getDataTypeNumber(types[i]);
buf.append((char *) (&typeNum), sizeof(int8_t));
@@ -501,7 +537,7 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
return;
}
- shared_ptr <TSocket> socket(new TSocket(host, rpcPort));
+ shared_ptr<TSocket> socket(new TSocket(host, rpcPort));
transport = std::make_shared<TFramedTransport>(socket);
socket->setConnTimeout(connectionTimeoutInMs);
if (!transport->isOpen()) {
@@ -513,10 +549,10 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
}
}
if (enableRPCCompression) {
- shared_ptr <TCompactProtocol> protocol(new TCompactProtocol(transport));
+ shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(transport));
client = std::make_shared<TSIServiceClient>(protocol);
} else {
- shared_ptr <TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
client = std::make_shared<TSIServiceClient>(protocol);
}
@@ -557,10 +593,10 @@ void Session::close() {
if (isClosed) {
return;
}
- shared_ptr <TSCloseSessionReq> req(new TSCloseSessionReq());
+ shared_ptr<TSCloseSessionReq> req(new TSCloseSessionReq());
req->__set_sessionId(sessionId);
try {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
client->closeSession(*resp, *req);
}
catch (exception e) {
@@ -575,8 +611,8 @@ void Session::close() {
void Session::insertRecord(const string &deviceId, int64_t time,
- const vector <string> &measurements,
- const vector <string> &values) {
+ const vector<string> &measurements,
+ const vector<string> &values) {
TSInsertStringRecordReq req;
req.__set_sessionId(sessionId);
req.__set_prefixPath(deviceId);
@@ -595,8 +631,8 @@ void Session::insertRecord(const string &deviceId, int64_t time,
}
void Session::insertRecord(const string &prefixPath, int64_t time,
- const vector <string> &measurements,
- const vector <TSDataType::TSDataType> &types,
+ const vector<string> &measurements,
+ const vector<TSDataType::TSDataType> &types,
const vector<char *> &values) {
TSInsertRecordReq req;
req.__set_sessionId(sessionId);
@@ -617,8 +653,8 @@ void Session::insertRecord(const string &prefixPath, int64_t time,
}
void Session::insertAlignedRecord(const string &deviceId, int64_t time,
- const vector <string> &measurements,
- const vector <string> &values) {
+ const vector<string> &measurements,
+ const vector<string> &values) {
TSInsertStringRecordReq req;
req.__set_sessionId(sessionId);
req.__set_prefixPath(deviceId);
@@ -637,9 +673,9 @@ void Session::insertAlignedRecord(const string &deviceId, int64_t time,
}
void Session::insertAlignedRecord(const string &prefixPath, int64_t time,
- const vector <string> &measurements,
- const vector <TSDataType::TSDataType> &types,
- const vector<char *> &values) {
+ const vector<string> &measurements,
+ const vector<TSDataType::TSDataType> &types,
+ const vector<char *> &values) {
TSInsertRecordReq req;
req.__set_sessionId(sessionId);
req.__set_prefixPath(prefixPath);
@@ -658,10 +694,10 @@ void Session::insertAlignedRecord(const string &prefixPath, int64_t time,
}
}
-void Session::insertRecords(const vector <string> &deviceIds,
- const vector <int64_t> ×,
- const vector <vector<string>> &measurementsList,
- const vector <vector<string>> &valuesList) {
+void Session::insertRecords(const vector<string> &deviceIds,
+ const vector<int64_t> ×,
+ const vector<vector<string>> &measurementsList,
+ const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
@@ -685,11 +721,11 @@ void Session::insertRecords(const vector <string> &deviceIds,
}
}
-void Session::insertRecords(const vector <string> &deviceIds,
- const vector <int64_t> ×,
- const vector <vector<string>> &measurementsList,
- const vector <vector<TSDataType::TSDataType>> &typesList,
- const vector <vector<char *>> &valuesList) {
+void Session::insertRecords(const vector<string> &deviceIds,
+ const vector<int64_t> ×,
+ const vector<vector<string>> &measurementsList,
+ const vector<vector<TSDataType::TSDataType>> &typesList,
+ const vector<vector<char *>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
@@ -700,7 +736,7 @@ void Session::insertRecords(const vector <string> &deviceIds,
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
- vector <string> bufferList;
+ vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -718,10 +754,10 @@ void Session::insertRecords(const vector <string> &deviceIds,
}
}
-void Session::insertAlignedRecords(const vector <string> &deviceIds,
- const vector <int64_t> ×,
- const vector <vector<string>> &measurementsList,
- const vector <vector<string>> &valuesList) {
+void Session::insertAlignedRecords(const vector<string> &deviceIds,
+ const vector<int64_t> ×,
+ const vector<vector<string>> &measurementsList,
+ const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
@@ -745,11 +781,11 @@ void Session::insertAlignedRecords(const vector <string> &deviceIds,
}
}
-void Session::insertAlignedRecords(const vector <string> &deviceIds,
- const vector <int64_t> ×,
- const vector <vector<string>> &measurementsList,
- const vector <vector<TSDataType::TSDataType>> &typesList,
- const vector <vector<char *>> &valuesList) {
+void Session::insertAlignedRecords(const vector<string> &deviceIds,
+ const vector<int64_t> ×,
+ const vector<vector<string>> &measurementsList,
+ const vector<vector<TSDataType::TSDataType>> &typesList,
+ const vector<vector<char *>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's size should be equal");
@@ -760,7 +796,7 @@ void Session::insertAlignedRecords(const vector <string> &deviceIds,
request.__set_prefixPaths(deviceIds);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
- vector <string> bufferList;
+ vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -779,18 +815,18 @@ void Session::insertAlignedRecords(const vector <string> &deviceIds,
}
void Session::insertRecordsOfOneDevice(const string &deviceId,
- vector <int64_t> ×,
- vector <vector<string>> &measurementsList,
- vector <vector<TSDataType::TSDataType>> &typesList,
- vector <vector<char *>> &valuesList) {
+ vector<int64_t> ×,
+ vector<vector<string>> &measurementsList,
+ vector<vector<TSDataType::TSDataType>> &typesList,
+ vector<vector<char *>> &valuesList) {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
void Session::insertRecordsOfOneDevice(const string &deviceId,
- vector <int64_t> ×,
- vector <vector<string>> &measurementsList,
- vector <vector<TSDataType::TSDataType>> &typesList,
- vector <vector<char *>> &valuesList,
+ vector<int64_t> ×,
+ vector<vector<string>> &measurementsList,
+ vector<vector<TSDataType::TSDataType>> &typesList,
+ vector<vector<char *>> &valuesList,
bool sorted) {
if (sorted) {
@@ -815,7 +851,7 @@ void Session::insertRecordsOfOneDevice(const string &deviceId,
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
- vector <string> bufferList;
+ vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -834,19 +870,19 @@ void Session::insertRecordsOfOneDevice(const string &deviceId,
}
void Session::insertAlignedRecordsOfOneDevice(const string &deviceId,
- vector <int64_t> ×,
- vector <vector<string>> &measurementsList,
- vector <vector<TSDataType::TSDataType>> &typesList,
- vector <vector<char *>> &valuesList) {
+ vector<int64_t> ×,
+ vector<vector<string>> &measurementsList,
+ vector<vector<TSDataType::TSDataType>> &typesList,
+ vector<vector<char *>> &valuesList) {
insertAlignedRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
void Session::insertAlignedRecordsOfOneDevice(const string &deviceId,
- vector <int64_t> ×,
- vector <vector<string>> &measurementsList,
- vector <vector<TSDataType::TSDataType>> &typesList,
- vector <vector<char *>> &valuesList,
- bool sorted) {
+ vector<int64_t> ×,
+ vector<vector<string>> &measurementsList,
+ vector<vector<TSDataType::TSDataType>> &typesList,
+ vector<vector<char *>> &valuesList,
+ bool sorted) {
if (sorted) {
if (!checkSorted(times)) {
@@ -870,7 +906,7 @@ void Session::insertAlignedRecordsOfOneDevice(const string &deviceId,
request.__set_prefixPath(deviceId);
request.__set_timestamps(times);
request.__set_measurementsList(measurementsList);
- vector <string> bufferList;
+ vector<string> bufferList;
for (size_t i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -910,7 +946,7 @@ void Session::insertTablet(Tablet &tablet, bool sorted) {
TSInsertTabletReq request;
request.__set_sessionId(sessionId);
request.prefixPath = tablet.deviceId;
- for (pair <string, TSDataType::TSDataType> schema: tablet.schemas) {
+ for (pair<string, TSDataType::TSDataType> schema: tablet.schemas) {
request.measurements.push_back(schema.first);
request.types.push_back(schema.second);
}
@@ -974,9 +1010,9 @@ void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
sortTablet(*(item.second));
}
request.prefixPaths.push_back(item.second->deviceId);
- vector <string> measurements;
+ vector<string> measurements;
vector<int> dataTypes;
- for (pair <string, TSDataType::TSDataType> schema: item.second->schemas) {
+ for (pair<string, TSDataType::TSDataType> schema: item.second->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
@@ -1002,7 +1038,7 @@ void Session::insertAlignedTablets(map<string, Tablet *> &tablets) {
}
void Session::insertAlignedTablets(map<string, Tablet *> &tablets, bool sorted) {
- for(map<string, Tablet *>::iterator iter = tablets.begin(); iter != tablets.end(); iter++) {
+ for (map<string, Tablet *>::iterator iter = tablets.begin(); iter != tablets.end(); iter++) {
iter->second->setAligned(true);
}
try {
@@ -1014,15 +1050,15 @@ void Session::insertAlignedTablets(map<string, Tablet *> &tablets, bool sorted)
}
}
-void Session::testInsertRecord(const string &deviceId, int64_t time, const vector <string> &measurements,
- const vector <string> &values) {
- shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
+void Session::testInsertRecord(const string &deviceId, int64_t time, const vector<string> &measurements,
+ const vector<string> &values) {
+ shared_ptr<TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
req->__set_sessionId(sessionId);
req->__set_prefixPath(deviceId);
req->__set_timestamp(time);
req->__set_measurements(measurements);
req->__set_values(values);
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->insertStringRecord(*resp, *req);
RpcUtils::verifySuccess(*resp);
@@ -1033,10 +1069,10 @@ void Session::testInsertRecord(const string &deviceId, int64_t time, const vecto
}
void Session::testInsertTablet(const Tablet &tablet) {
- shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
+ shared_ptr<TSInsertTabletReq> request(new TSInsertTabletReq());
request->__set_sessionId(sessionId);
request->prefixPath = tablet.deviceId;
- for (pair <string, TSDataType::TSDataType> schema: tablet.schemas) {
+ for (pair<string, TSDataType::TSDataType> schema: tablet.schemas) {
request->measurements.push_back(schema.first);
request->types.push_back(schema.second);
}
@@ -1045,7 +1081,7 @@ void Session::testInsertTablet(const Tablet &tablet) {
request->__set_size(tablet.rowSize);
try {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
client->testInsertTablet(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
@@ -1054,16 +1090,16 @@ void Session::testInsertTablet(const Tablet &tablet) {
}
}
-void Session::testInsertRecords(const vector <string> &deviceIds,
- const vector <int64_t> ×,
- const vector <vector<string>> &measurementsList,
- const vector <vector<string>> &valuesList) {
+void Session::testInsertRecords(const vector<string> &deviceIds,
+ const vector<int64_t> ×,
+ const vector<vector<string>> &measurementsList,
+ const vector<vector<string>> &valuesList) {
size_t len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
logic_error error("deviceIds, times, measurementsList and valuesList's size should be equal");
throw exception(error);
}
- shared_ptr <TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
+ shared_ptr<TSInsertStringRecordsReq> request(new TSInsertStringRecordsReq());
request->__set_sessionId(sessionId);
request->__set_prefixPaths(deviceIds);
request->__set_timestamps(times);
@@ -1071,7 +1107,7 @@ void Session::testInsertRecords(const vector <string> &deviceIds,
request->__set_valuesList(valuesList);
try {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
client->insertStringRecords(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
@@ -1081,13 +1117,13 @@ void Session::testInsertRecords(const vector <string> &deviceIds,
}
void Session::deleteTimeseries(const string &path) {
- vector <string> paths;
+ vector<string> paths;
paths.push_back(path);
deleteTimeseries(paths);
}
-void Session::deleteTimeseries(const vector <string> &paths) {
- shared_ptr <TSStatus> resp(new TSStatus());
+void Session::deleteTimeseries(const vector<string> &paths) {
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->deleteTimeseries(*resp, sessionId, paths);
RpcUtils::verifySuccess(*resp);
@@ -1098,17 +1134,17 @@ void Session::deleteTimeseries(const vector <string> &paths) {
}
void Session::deleteData(const string &path, int64_t time) {
- vector <string> paths;
+ vector<string> paths;
paths.push_back(path);
deleteData(paths, time);
}
-void Session::deleteData(const vector <string> &deviceId, int64_t time) {
- shared_ptr <TSDeleteDataReq> req(new TSDeleteDataReq());
+void Session::deleteData(const vector<string> &deviceId, int64_t time) {
+ shared_ptr<TSDeleteDataReq> req(new TSDeleteDataReq());
req->__set_sessionId(sessionId);
req->__set_paths(deviceId);
req->__set_endTime(time);
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->deleteData(*resp, *req);
RpcUtils::verifySuccess(*resp);
@@ -1119,7 +1155,7 @@ void Session::deleteData(const vector <string> &deviceId, int64_t time) {
}
void Session::setStorageGroup(const string &storageGroupId) {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->setStorageGroup(*resp, sessionId, storageGroupId);
RpcUtils::verifySuccess(*resp);
@@ -1130,13 +1166,13 @@ void Session::setStorageGroup(const string &storageGroupId) {
}
void Session::deleteStorageGroup(const string &storageGroup) {
- vector <string> storageGroups;
+ vector<string> storageGroups;
storageGroups.push_back(storageGroup);
deleteStorageGroups(storageGroups);
}
-void Session::deleteStorageGroups(const vector <string> &storageGroups) {
- shared_ptr <TSStatus> resp(new TSStatus());
+void Session::deleteStorageGroups(const vector<string> &storageGroups) {
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->deleteStorageGroups(*resp, sessionId, storageGroups);
RpcUtils::verifySuccess(*resp);
@@ -1163,11 +1199,11 @@ void Session::createTimeseries(const string &path,
TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
CompressionType::CompressionType compressor,
- map <string, string> *props,
- map <string, string> *tags,
- map <string, string> *attributes,
+ map<string, string> *props,
+ map<string, string> *tags,
+ map<string, string> *attributes,
const string &measurementAlias) {
- shared_ptr <TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
+ shared_ptr<TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
req->__set_sessionId(sessionId);
req->__set_path(path);
req->__set_dataType(dataType);
@@ -1187,7 +1223,7 @@ void Session::createTimeseries(const string &path,
req->__set_measurementAlias(measurementAlias);
}
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->createTimeseries(*resp, *req);
RpcUtils::verifySuccess(*resp);
@@ -1197,15 +1233,15 @@ void Session::createTimeseries(const string &path,
}
}
-void Session::createMultiTimeseries(const vector <string> &paths,
- const vector <TSDataType::TSDataType> &dataTypes,
- const vector <TSEncoding::TSEncoding> &encodings,
- const vector <CompressionType::CompressionType> &compressors,
- vector <map<string, string>> *propsList,
- vector <map<string, string>> *tagsList,
- vector <map<string, string>> *attributesList,
- vector <string> *measurementAliasList) {
- shared_ptr <TSCreateMultiTimeseriesReq> request(new TSCreateMultiTimeseriesReq());
+void Session::createMultiTimeseries(const vector<string> &paths,
+ const vector<TSDataType::TSDataType> &dataTypes,
+ const vector<TSEncoding::TSEncoding> &encodings,
+ const vector<CompressionType::CompressionType> &compressors,
+ vector<map<string, string>> *propsList,
+ vector<map<string, string>> *tagsList,
+ vector<map<string, string>> *attributesList,
+ vector<string> *measurementAliasList) {
+ shared_ptr<TSCreateMultiTimeseriesReq> request(new TSCreateMultiTimeseriesReq());
request->__set_sessionId(sessionId);
request->__set_paths(paths);
@@ -1242,7 +1278,7 @@ void Session::createMultiTimeseries(const vector <string> &paths,
}
try {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
client->createMultiTimeseries(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
@@ -1252,11 +1288,11 @@ void Session::createMultiTimeseries(const vector <string> &paths,
}
void Session::createAlignedTimeseries(const std::string &deviceId,
- const std::vector <std::string> &measurements,
- const std::vector <TSDataType::TSDataType> &dataTypes,
- const std::vector <TSEncoding::TSEncoding> &encodings,
- const std::vector <CompressionType::CompressionType> &compressors) {
- shared_ptr <TSCreateAlignedTimeseriesReq> request(new TSCreateAlignedTimeseriesReq());
+ const std::vector<std::string> &measurements,
+ const std::vector<TSDataType::TSDataType> &dataTypes,
+ const std::vector<TSEncoding::TSEncoding> &encodings,
+ const std::vector<CompressionType::CompressionType> &compressors) {
+ shared_ptr<TSCreateAlignedTimeseriesReq> request(new TSCreateAlignedTimeseriesReq());
request->__set_sessionId(sessionId);
request->__set_prefixPath(deviceId);
request->__set_measurements(measurements);
@@ -1280,7 +1316,7 @@ void Session::createAlignedTimeseries(const std::string &deviceId,
request->__set_compressors(compressorsOrdinal);
try {
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
client->createAlignedTimeseries(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
@@ -1291,7 +1327,7 @@ void Session::createAlignedTimeseries(const std::string &deviceId,
bool Session::checkTimeseriesExists(const string &path) {
try {
- std::unique_ptr <SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
+ std::unique_ptr<SessionDataSet> dataset = executeQueryStatement("SHOW TIMESERIES " + path);
bool isExisted = dataset->hasNext();
dataset->closeOperationHandle();
return isExisted;
@@ -1306,7 +1342,7 @@ string Session::getTimeZone() {
if (zoneId != "") {
return zoneId;
}
- shared_ptr <TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
+ shared_ptr<TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
try {
client->getTimeZone(*resp, sessionId);
RpcUtils::verifySuccess(resp->status);
@@ -1318,10 +1354,10 @@ string Session::getTimeZone() {
}
void Session::setTimeZone(const string &zoneId) {
- shared_ptr <TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
+ shared_ptr<TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
req->__set_sessionId(sessionId);
req->__set_timeZone(zoneId);
- shared_ptr <TSStatus> resp(new TSStatus());
+ shared_ptr<TSStatus> resp(new TSStatus());
try {
client->setTimeZone(*resp, *req);
}
@@ -1332,13 +1368,13 @@ void Session::setTimeZone(const string &zoneId) {
this->zoneId = zoneId;
}
-unique_ptr <SessionDataSet> Session::executeQueryStatement(const string &sql) {
- shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
+unique_ptr<SessionDataSet> Session::executeQueryStatement(const string &sql) {
+ shared_ptr<TSExecuteStatementReq> req(new TSExecuteStatementReq());
req->__set_sessionId(sessionId);
req->__set_statementId(statementId);
req->__set_statement(sql);
req->__set_fetchSize(fetchSize);
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
try {
client->executeStatement(*resp, *req);
RpcUtils::verifySuccess(resp->status);
@@ -1346,17 +1382,18 @@ unique_ptr <SessionDataSet> Session::executeQueryStatement(const string &sql) {
catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
- shared_ptr <TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp->queryDataSet));
+ shared_ptr<TSQueryDataSet> queryDataSet(new TSQueryDataSet(resp->queryDataSet));
return unique_ptr<SessionDataSet>(new SessionDataSet(
- sql, resp->columns, resp->dataTypeList, resp->columnNameIndexMap, resp->ignoreTimeStamp, resp->queryId, statementId, client, sessionId, queryDataSet));
+ sql, resp->columns, resp->dataTypeList, resp->columnNameIndexMap, resp->ignoreTimeStamp, resp->queryId,
+ statementId, client, sessionId, queryDataSet));
}
void Session::executeNonQueryStatement(const string &sql) {
- shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
+ shared_ptr<TSExecuteStatementReq> req(new TSExecuteStatementReq());
req->__set_sessionId(sessionId);
req->__set_statementId(statementId);
req->__set_statement(sql);
- shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
try {
client->executeUpdateStatement(*resp, *req);
RpcUtils::verifySuccess(resp->status);
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index c4b3b01..7ea7413 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -192,42 +192,42 @@ public:
static void verifySuccess(const TSStatus &status);
- static void verifySuccess(const std::vector<TSStatus> &statuses);
+ static void verifySuccess(const std::vector <TSStatus> &statuses);
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
static TSStatus getStatus(int code, const std::string &message);
- static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
+ static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
- static std::shared_ptr<TSExecuteStatementResp>
+ static std::shared_ptr <TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &message);
- static std::shared_ptr<TSExecuteStatementResp> getTSExecuteStatementResp(const TSStatus &status);
+ static std::shared_ptr <TSExecuteStatementResp> getTSExecuteStatementResp(const TSStatus &status);
- static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
+ static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
- static std::shared_ptr<TSFetchResultsResp>
+ static std::shared_ptr <TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const std::string &appendMessage);
- static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const TSStatus &status);
+ static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(const TSStatus &status);
};
// Simulate the ByteBuffer class in Java
class MyStringBuffer {
public:
- MyStringBuffer():pos(0) {
+ MyStringBuffer() : pos(0) {
checkBigEndian();
}
- MyStringBuffer(std::string str):str(str),pos(0) {
+ MyStringBuffer(std::string str) : str(str), pos(0) {
checkBigEndian();
}
bool hasRemaining() {
return pos < str.size();
}
-
+
int getInt() {
return *(int *) getOrderedByte(4);
}
@@ -280,7 +280,7 @@ public:
}
void putBool(bool ins) {
- char tmp = ins? 1 : 0;
+ char tmp = ins ? 1 : 0;
str += tmp;
}
@@ -317,8 +317,7 @@ private:
void putOrderedByte(char *buf, int len) {
if (isBigEndian) {
str.assign(buf, len);
- }
- else {
+ } else {
for (int i = len - 1; i > -1; i--) {
str += buf[i];
}
@@ -330,6 +329,98 @@ private:
char numericBuf[8]; //only be used by int, long, float, double etc.
};
+class BitMap {
+public:
+ /** Initialize a BitMap with given size. */
+ BitMap(size_t size) {
+ this->size = size;
+ this->bits.resize((size >> 3) + 1); // equal to "size/8 + 1"
+ std::fill(bits.begin(), bits.end(), (char) 0);
+ }
+
+ /** mark as 1 at the given bit position. */
+ void mark(int position) {
+ bits[position >> 3] |= BIT_UTIL[position % 8];
+ }
+
+ /** mark as 0 at the given bit position. */
+ void unmark(int position) {
+ bits[position >> 3] &= UNMARK_BIT_UTIL[position % 8];
+ }
+
+ /** mark as 1 at all positions. */
+ void markAll() {
+ std::fill(bits.begin(), bits.end(), (char) 0XFF);
+ }
+
+ /** mark as 0 at all positions. */
+ void reset() {
+ std::fill(bits.begin(), bits.end(), (char) 0);
+ }
+
+ /** returns the value of the bit with the specified index. */
+ bool isMarked(int position) {
+ return (bits[position >> 3] & BIT_UTIL[position % 8]) != 0;
+ }
+
+ /** whether all bits are zero, i.e., no Null value */
+ bool isAllUnmarked() {
+ int j;
+ for (j = 0; j < size >> 3; j++) {
+ if (bits[j] != (char) 0) {
+ return false;
+ }
+ }
+ for (j = 0; j < size % 8; j++) {
+ if ((bits[size >> 3] & BIT_UTIL[j]) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** whether all bits are one, i.e., all are Null */
+ bool isAllMarked() {
+ int j;
+ for (j = 0; j < size >> 3; j++) {
+ if (bits[j] != (char) 0XFF) {
+ return false;
+ }
+ }
+ for (j = 0; j < size % 8; j++) {
+ if ((bits[size >> 3] & BIT_UTIL[j]) == 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ std::vector<char> getByteArray() {
+ return this->bits;
+ }
+
+ size_t getSize() {
+ return this->size;
+ }
+
+private:
+ std::vector<char> BIT_UTIL = {
+ (char) 1, (char) 2, (char) 4, (char) 8, (char) 16, (char) 32, (char) 64, (char) -128
+ };
+ std::vector<char> UNMARK_BIT_UTIL = {
+ (char) 0XFE, // 11111110
+ (char) 0XFD, // 11111101
+ (char) 0XFB, // 11111011
+ (char) 0XF7, // 11110111
+ (char) 0XEF, // 11101111
+ (char) 0XDF, // 11011111
+ (char) 0XBF, // 10111111
+ (char) 0X7F // 01111111
+ };
+
+ std::vector<char> bits;
+ size_t size;
+};
class Field {
public:
@@ -368,8 +459,9 @@ private:
public:
std::string deviceId; // deviceId of this tablet
std::vector <std::pair<std::string, TSDataType::TSDataType>> schemas; // the list of measurement schemas for creating the tablet
- std::vector <int64_t> timestamps; //timestamps in this tablet
- std::vector <std::vector<std::string>> values;
+ std::vector <int64_t> timestamps; // timestamps in this tablet
+ std::vector <std::vector<std::string>> values; // each object is a primitive type array, which represents values of one measurement
+ std::vector <std::unique_ptr<BitMap>> bitMaps; // each bitmap represents the existence of each value in the current column
int rowSize; //the number of rows to include in this tablet
int maxRowNumber; // the maximum number of rows for this tablet
bool isAligned; // whether this tablet store data of aligned timeseries or not
@@ -383,7 +475,8 @@ public:
* @param deviceId the name of the device specified to be written in
* @param timeseries the list of measurement schemas for creating the tablet
*/
- Tablet(const std::string &deviceId, const std::vector <std::pair<std::string, TSDataType::TSDataType>> ×eries) {
+ Tablet(const std::string &deviceId,
+ const std::vector <std::pair<std::string, TSDataType::TSDataType>> ×eries) {
Tablet(deviceId, timeseries, DEFAULT_SIZE);
}
@@ -398,19 +491,22 @@ public:
* @param maxRowNumber the maximum number of rows for this tablet
*/
Tablet(const std::string &deviceId, const std::vector <std::pair<std::string, TSDataType::TSDataType>> &schemas,
- int maxRowNumber, bool isAligned_ = false) : deviceId(deviceId), schemas(schemas), maxRowNumber(maxRowNumber), isAligned(isAligned_){
+ int maxRowNumber, bool _isAligned = false) : deviceId(deviceId), schemas(schemas),
+ maxRowNumber(maxRowNumber), isAligned(_isAligned) {
// create timestamp column
timestamps.resize(maxRowNumber);
- // create value columns
+ // create value columns and bitMaps
values.resize(schemas.size());
+ bitMaps.resize(schemas.size());
for (size_t i = 0; i < schemas.size(); i++) {
values[i].resize(maxRowNumber);
+ bitMaps[i] = std::unique_ptr<BitMap>(new BitMap(maxRowNumber));
}
-
this->rowSize = 0;
}
void reset(); // Reset Tablet to the default state - set the rowSize to 0
+
void createColumns();
int getTimeBytesSize();
@@ -454,12 +550,12 @@ public:
std::string toString() {
std::string ret = "";
- if(this->timestamp != -1) {
+ if (this->timestamp != -1) {
ret.append(std::to_string(timestamp));
ret.append("\t");
}
for (size_t i = 0; i < fields.size(); i++) {
- if(i != 0) {
+ if (i != 0) {
ret.append("\t");
}
TSDataType::TSDataType dataType = fields[i].dataType;
@@ -531,8 +627,8 @@ public:
SessionDataSet() {}
SessionDataSet(const std::string &sql,
- const std::vector<std::string> &columnNameList,
- const std::vector<std::string> &columnTypeList,
+ const std::vector <std::string> &columnNameList,
+ const std::vector <std::string> &columnTypeList,
std::map<std::string, int> &columnNameIndexMap,
bool isIgnoreTimeStamp,
int64_t queryId, int64_t statementId,
@@ -557,11 +653,13 @@ public:
this->columnMap[name] = i;
this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
}
- if(!columnNameIndexMap.empty()) {
+ if (!columnNameIndexMap.empty()) {
this->valueBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[columnNameIndexMap[name]])));
+ std::unique_ptr<MyStringBuffer>(
+ new MyStringBuffer(queryDataSet->valueList[columnNameIndexMap[name]])));
this->bitmapBuffers.push_back(
- std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->bitmapList[columnNameIndexMap[name]])));
+ std::unique_ptr<MyStringBuffer>(
+ new MyStringBuffer(queryDataSet->bitmapList[columnNameIndexMap[name]])));
} else {
this->valueBuffers.push_back(
std::unique_ptr<MyStringBuffer>(new MyStringBuffer(queryDataSet->valueList[columnMap[name]])));
@@ -637,14 +735,17 @@ private:
void appendValues(std::string &buffer, const char *value, int size);
void
- putValuesIntoBuffer(const std::vector <TSDataType::TSDataType> &types, const std::vector<char *> &values, std::string &buf);
+ putValuesIntoBuffer(const std::vector <TSDataType::TSDataType> &types, const std::vector<char *> &values,
+ std::string &buf);
int8_t getDataTypeNumber(TSDataType::TSDataType type);
struct TsCompare {
std::vector <int64_t> ×tamps;
- TsCompare(std::vector <int64_t> &inTimestamps):timestamps(inTimestamps) {};
- bool operator() (int i, int j) { return (timestamps[i] < timestamps[j]) ;};
+
+ TsCompare(std::vector <int64_t> &inTimestamps) : timestamps(inTimestamps) {};
+
+ bool operator()(int i, int j) { return (timestamps[i] < timestamps[j]); };
};
public:
@@ -662,7 +763,8 @@ public:
this->zoneId = "UTC+08:00";
}
- Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password, int fetchSize) {
+ Session(const std::string &host, int rpcPort, const std::string &username, const std::string &password,
+ int fetchSize) {
this->host = host;
this->rpcPort = rpcPort;
this->username = username;
@@ -698,10 +800,10 @@ public:
const std::vector <TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector <std::string> &measurements,
- const std::vector <std::string> &values);
+ const std::vector <std::string> &values);
void insertAlignedRecord(const std::string &deviceId, int64_t time, const std::vector <std::string> &measurements,
- const std::vector <TSDataType::TSDataType> &types, const std::vector<char *> &values);
+ const std::vector <TSDataType::TSDataType> &types, const std::vector<char *> &values);
void insertRecords(const std::vector <std::string> &deviceIds,
const std::vector <int64_t> ×,
@@ -715,15 +817,15 @@ public:
const std::vector <std::vector<char *>> &valuesList);
void insertAlignedRecords(const std::vector <std::string> &deviceIds,
- const std::vector <int64_t> ×,
- const std::vector <std::vector<std::string>> &measurementsList,
- const std::vector <std::vector<std::string>> &valuesList);
+ const std::vector <int64_t> ×,
+ const std::vector <std::vector<std::string>> &measurementsList,
+ const std::vector <std::vector<std::string>> &valuesList);
void insertAlignedRecords(const std::vector <std::string> &deviceIds,
- const std::vector <int64_t> ×,
- const std::vector <std::vector<std::string>> &measurementsList,
- const std::vector <std::vector<TSDataType::TSDataType>> &typesList,
- const std::vector <std::vector<char *>> &valuesList);
+ const std::vector <int64_t> ×,
+ const std::vector <std::vector<std::string>> &measurementsList,
+ const std::vector <std::vector<TSDataType::TSDataType>> &typesList,
+ const std::vector <std::vector<char *>> &valuesList);
void insertRecordsOfOneDevice(const std::string &deviceId,
std::vector <int64_t> ×,
@@ -739,17 +841,17 @@ public:
bool sorted);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
- std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> &measurementsList,
- std::vector <std::vector<TSDataType::TSDataType>> &typesList,
- std::vector <std::vector<char *>> &valuesList);
+ std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> &measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>> &typesList,
+ std::vector <std::vector<char *>> &valuesList);
void insertAlignedRecordsOfOneDevice(const std::string &deviceId,
- std::vector <int64_t> ×,
- std::vector <std::vector<std::string>> &measurementsList,
- std::vector <std::vector<TSDataType::TSDataType>> &typesList,
- std::vector <std::vector<char *>> &valuesList,
- bool sorted);
+ std::vector <int64_t> ×,
+ std::vector <std::vector<std::string>> &measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>> &typesList,
+ std::vector <std::vector<char *>> &valuesList,
+ bool sorted);
void insertTablet(Tablet &tablet);
diff --git a/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp b/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
index 4d78f04..345c09b 100644
--- a/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
+++ b/example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
@@ -190,6 +190,46 @@ void insertAlignedTablets() {
}
}
+void insertNullableTabletWithAlignedTimeseries() {
+ pair<string, TSDataType::TSDataType> pairA("s1", TSDataType::INT32);
+ pair<string, TSDataType::TSDataType> pairB("s2", TSDataType::DOUBLE);
+ pair<string, TSDataType::TSDataType> pairC("s3", TSDataType::BOOLEAN);
+ vector<pair<string, TSDataType::TSDataType>> schemas;
+ schemas.push_back(pairA);
+ schemas.push_back(pairB);
+ schemas.push_back(pairC);
+
+ Tablet tablet("root.sg1.d1", schemas, 10);
+ tablet.setAligned(true);
+
+ for (int64_t time = 40; time < 50; time++) {
+ int row = tablet.rowSize++;
+ tablet.timestamps[row] = time;
+ for (int i = 0; i < 3; i++) {
+ if (i == 0) {
+ tablet.values[i][row] = "1";
+ } else if (i == 1) {
+ tablet.values[i][row] = "1.0";
+ } else {
+ tablet.values[i][row] = "true";
+ }
+ // mark null value
+ if (row % 3 == i) {
+ tablet.bitMaps[i]->mark(row);
+ }
+ }
+ if (tablet.rowSize == tablet.maxRowNumber) {
+ session->insertTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session->insertTablet(tablet);
+ tablet.reset();
+ }
+}
+
void query() {
unique_ptr<SessionDataSet> dataSet = session->executeQueryStatement("select * from root.sg1.d1");
cout << "timestamp" << " ";
@@ -209,7 +249,7 @@ void query() {
void deleteData() {
string path = "root.sg1.d1.s1";
- int64_t deleteTime = 39;
+ int64_t deleteTime = 49;
session->deleteData(path, deleteTime);
}
@@ -260,6 +300,9 @@ int main() {
cout << "insertAlignedTablets\n" << endl;
insertAlignedTablets();
+ cout << "insertNullableTabletWithAlignedTimeseries\n" << endl;
+ insertNullableTabletWithAlignedTimeseries();
+
cout << "query\n" << endl;
query();
diff --git a/example/client-cpp-example/src/SessionExample.cpp b/example/client-cpp-example/src/SessionExample.cpp
index 72e37ed..4ebac73 100644
--- a/example/client-cpp-example/src/SessionExample.cpp
+++ b/example/client-cpp-example/src/SessionExample.cpp
@@ -227,12 +227,53 @@ void insertTablets() {
}
}
+void insertTabletWithNullValues() {
+ /*
+ * A Tablet example:
+ * device1
+ * time s1, s2, s3
+ * 1, null, 1, 1
+ * 2, 2, null, 2
+ * 3, 3, 3, null
+ */
+ pair<string, TSDataType::TSDataType> pairA("s1", TSDataType::INT64);
+ pair<string, TSDataType::TSDataType> pairB("s2", TSDataType::INT64);
+ pair<string, TSDataType::TSDataType> pairC("s3", TSDataType::INT64);
+ vector<pair<string, TSDataType::TSDataType>> schemas;
+ schemas.push_back(pairA);
+ schemas.push_back(pairB);
+ schemas.push_back(pairC);
+
+ Tablet tablet("root.sg1.d1", schemas, 10);
+
+ for (int64_t time = 40; time < 50; time++) {
+ int row = tablet.rowSize++;
+ tablet.timestamps[row] = time;
+ for (int i = 0; i < 3; i++) {
+ tablet.values[i][row] = to_string(i);
+ // mark null value
+ if (row % 3 == i) {
+ tablet.bitMaps[i]->mark(row);
+ }
+ }
+ if (tablet.rowSize == tablet.maxRowNumber) {
+ session->insertTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session->insertTablet(tablet);
+ tablet.reset();
+ }
+}
+
void nonQuery() {
- session->executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(50, 1);");
+ session->executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(100, 1);");
}
void query() {
- unique_ptr<SessionDataSet> dataSet = session->executeQueryStatement("select * from root.sg1.d1");
+ unique_ptr<SessionDataSet> dataSet = session->executeQueryStatement("select s1, s2, s3 from root.sg1.d1");
cout << "timestamp" << " ";
for (const string &name: dataSet->getColumnNames()) {
cout << name << " ";
@@ -323,6 +364,9 @@ int main() {
cout << "insertTablets\n" << endl;
insertTablets();
+ cout << "insertTabletWithNullValues\n" << endl;
+ insertTabletWithNullValues();
+
cout << "nonQuery\n" << endl;
nonQuery();
diff --git a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 3235c3d..f2f8a66 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -57,19 +56,9 @@ public class SessionUtils {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static ByteBuffer getValueBuffer(Tablet tablet) {
ByteBuffer valueBuffer = ByteBuffer.allocate(tablet.getTotalValueOccupation());
- int indexOfValues = 0;
for (int i = 0; i < tablet.getSchemas().size(); i++) {
- IMeasurementSchema schema = tablet.getSchemas().get(i);
- if (schema instanceof MeasurementSchema) {
- getValueBufferOfDataType(schema.getType(), tablet, indexOfValues, valueBuffer);
- indexOfValues++;
- } else {
- for (int j = 0; j < schema.getSubMeasurementsTSDataTypeList().size(); j++) {
- getValueBufferOfDataType(
- schema.getSubMeasurementsTSDataTypeList().get(j), tablet, indexOfValues, valueBuffer);
- indexOfValues++;
- }
- }
+ MeasurementSchema schema = tablet.getSchemas().get(i);
+ getValueBufferOfDataType(schema.getType(), tablet, i, valueBuffer);
}
if (tablet.bitMaps != null) {
for (BitMap bitMap : tablet.bitMaps) {