You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/07 09:04:14 UTC
[incubator-iotdb] branch master updated: Change the return content
in TSQueryDataSet and remove the row record in IoTDBQueryResultSet (#626)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 036a9fa Change the return content in TSQueryDataSet and remove the row record in IoTDBQueryResultSet (#626)
036a9fa is described below
commit 036a9fa586e0b473070c444e5fd308a2da0376a3
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Sat Dec 7 17:04:03 2019 +0800
Change the return content in TSQueryDataSet and remove the row record in IoTDBQueryResultSet (#626)
* optimize the TSQueryDataSet to the best in rpc
---
.../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 204 +++++++++++++--------
.../src/main/java/org/apache/iotdb/jdbc/Utils.java | 148 ---------------
.../apache/iotdb/jdbc/IoTDBQueryResultSetTest.java | 114 +++++++-----
.../test/java/org/apache/iotdb/jdbc/UtilsTest.java | 175 +-----------------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 73 ++------
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 90 ++++++---
.../apache/iotdb/db/integration/IoTDBDaemonIT.java | 22 +--
.../integration/IoTDBLoadExternalTsfileTest.java | 19 +-
.../iotdb/db/integration/IoTDBQueryDemoIT.java | 16 +-
service-rpc/rpc-changelist.md | 1 +
service-rpc/src/main/thrift/rpc.thrift | 8 +-
.../org/apache/iotdb/session/SessionDataSet.java | 141 ++++++++++----
.../org/apache/iotdb/session/SessionUtils.java | 78 +-------
.../org/apache/iotdb/session/IoTDBSessionIT.java | 19 +-
14 files changed, 410 insertions(+), 698 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
index 02096c4..d40b955 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
@@ -19,51 +19,27 @@
package org.apache.iotdb.jdbc;
-import java.io.IOException;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.math.MathContext;
import java.net.URL;
import java.nio.ByteBuffer;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
import java.sql.Date;
-import java.sql.NClob;
-import java.sql.Ref;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.RowId;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.rpc.IoTDBRPCException;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.thrift.TException;
-import org.slf4j.LoggerFactory;
+import java.sql.*;
+import java.util.*;
public class IoTDBQueryResultSet implements ResultSet {
- private static final org.slf4j.Logger logger = LoggerFactory.getLogger(IoTDBQueryResultSet.class);
private static final String TIMESTAMP_STR = "Time";
private static final int START_INDEX = 2;
private Statement statement = null;
@@ -76,12 +52,15 @@ public class IoTDBQueryResultSet implements ResultSet {
private List<String> columnTypeList; // no deduplication
private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
- private RowRecord record;
+ private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
private int fetchSize;
private boolean emptyResultSet = false;
private TSQueryDataSet tsQueryDataSet = null;
- private ByteBuffer byteBuffer = null;
+ private byte[] time; // used to cache the current time value
+ private byte[][] values; // used to cache the current row record value
+ private byte[] currentBitmap; // used to cache the current bitmap for every column
+ private static final int flag = 0x80; // used to do `and` operation with bitmap to judge whether the value is null
private long queryId;
private boolean ignoreTimeStamp = false;
@@ -98,6 +77,10 @@ public class IoTDBQueryResultSet implements ResultSet {
this.fetchSize = statement.getFetchSize();
this.columnTypeList = columnTypeList;
+ time = new byte[Long.BYTES];
+ values = new byte[columnNameList.size()][];
+ currentBitmap = new byte[columnNameList.size()];
+
this.columnInfoList = new ArrayList<>();
this.columnInfoList.add(TIMESTAMP_STR);
// deduplicate and map
@@ -267,10 +250,10 @@ public class IoTDBQueryResultSet implements ResultSet {
public boolean getBoolean(String columnName) throws SQLException {
checkRecord();
int index = columnInfoMap.get(columnName) - START_INDEX;
- Field field = record.getFields().get(index);
- if (field.getDataType() != null && field.getDataType() == TSDataType.BOOLEAN) {
- return field.getBoolV();
- } else {
+ if (values[index] != null) {
+ return BytesUtils.bytesToBool(values[index]);
+ }
+ else {
throw new SQLException(
String.format("The value got by %s (column name) is NULL.", columnName));
}
@@ -355,12 +338,12 @@ public class IoTDBQueryResultSet implements ResultSet {
public double getDouble(String columnName) throws SQLException {
checkRecord();
int index = columnInfoMap.get(columnName) - START_INDEX;
- Field field = record.getFields().get(index);
- if (field.getDataType() != null && field.getDataType() == TSDataType.DOUBLE) {
- return field.getDoubleV();
- } else {
+ if (values[index] != null) {
+ return BytesUtils.bytesToDouble(values[index]);
+ }
+ else {
throw new SQLException(
- String.format("The value got by %s (column name) is NULL.", columnName));
+ String.format("The value got by %s (column name) is NULL.", columnName));
}
}
@@ -393,12 +376,12 @@ public class IoTDBQueryResultSet implements ResultSet {
public float getFloat(String columnName) throws SQLException {
checkRecord();
int index = columnInfoMap.get(columnName) - START_INDEX;
- Field field = record.getFields().get(index);
- if (field.getDataType() != null && field.getDataType() == TSDataType.FLOAT) {
- return field.getFloatV();
- } else {
+ if (values[index] != null) {
+ return BytesUtils.bytesToFloat(values[index]);
+ }
+ else {
throw new SQLException(
- String.format("The value got by %s (column name) is NULL.", columnName));
+ String.format("The value got by %s (column name) is NULL.", columnName));
}
}
@@ -416,12 +399,12 @@ public class IoTDBQueryResultSet implements ResultSet {
public int getInt(String columnName) throws SQLException {
checkRecord();
int index = columnInfoMap.get(columnName) - START_INDEX;
- Field field = record.getFields().get(index);
- if (field.getDataType() != null && field.getDataType() == TSDataType.INT32) {
- return field.getIntV();
- } else {
+ if (values[index] != null) {
+ return BytesUtils.bytesToInt(values[index]);
+ }
+ else {
throw new SQLException(
- String.format("The value got by %s (column name) is NULL.", columnName));
+ String.format("The value got by %s (column name) is NULL.", columnName));
}
}
@@ -434,15 +417,15 @@ public class IoTDBQueryResultSet implements ResultSet {
public long getLong(String columnName) throws SQLException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return record.getTimestamp();
+ return BytesUtils.bytesToLong(time);
}
int index = columnInfoMap.get(columnName) - START_INDEX;
- Field field = record.getFields().get(index);
- if (field.getDataType() != null && field.getDataType() == TSDataType.INT64) {
- return field.getLongV();
- } else {
+ if (values[index] != null) {
+ return BytesUtils.bytesToLong(values[index]);
+ }
+ else {
throw new SQLException(
- String.format("The value got by %s (column name) is NULL.", columnName));
+ String.format("The value got by %s (column name) is NULL.", columnName));
}
}
@@ -688,7 +671,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public boolean next() throws SQLException {
- if ((tsQueryDataSet == null || !byteBuffer.hasRemaining()) && !emptyResultSet) {
+ if ((tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) && !emptyResultSet) {
TSFetchResultsReq req = new TSFetchResultsReq(sql, fetchSize, queryId);
try {
TSFetchResultsResp resp = client.fetchResults(req);
@@ -701,11 +684,7 @@ public class IoTDBQueryResultSet implements ResultSet {
emptyResultSet = true;
} else {
tsQueryDataSet = resp.getQueryDataSet();
- try {
- byteBuffer = Utils.convertResultBuffer(tsQueryDataSet, columnTypeDeduplicatedList);
- } catch (IOException e) {
- throw new IoTDBSQLException(e.getMessage());
- }
+ rowsIndex = 0;
}
} catch (TException e) {
throw new SQLException(
@@ -716,10 +695,72 @@ public class IoTDBQueryResultSet implements ResultSet {
if (emptyResultSet) {
return false;
}
- record = Utils.getRowRecord(byteBuffer, columnTypeDeduplicatedList);
+ constructOneRow();
return true;
}
+ private void constructOneRow() {
+ tsQueryDataSet.time.get(time);
+ for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+ ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+ // another new 8 row, should move the bitmap buffer position to next byte
+ if (rowsIndex % 8 == 0) {
+ currentBitmap[i] = bitmapBuffer.get();
+ }
+ values[i] = null;
+ if (!isNull(i, rowsIndex)) {
+ ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+ TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
+ switch (dataType) {
+ case BOOLEAN:
+ if (values[i] == null)
+ values[i] = new byte[1];
+ valueBuffer.get(values[i]);
+ break;
+ case INT32:
+ if (values[i] == null)
+ values[i] = new byte[Integer.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case INT64:
+ if (values[i] == null)
+ values[i] = new byte[Long.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case FLOAT:
+ if (values[i] == null)
+ values[i] = new byte[Float.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case DOUBLE:
+ if (values[i] == null)
+ values[i] = new byte[Double.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case TEXT:
+ int length = valueBuffer.getInt();
+ values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+ }
+ }
+ }
+ rowsIndex++;
+ }
+
+ /**
+ * judge whether the specified column value is null in the current position
+ * @param index column index
+ * @return
+ */
+ private boolean isNull(int index, int rowNum) {
+ byte bitmap = currentBitmap[index];
+ int shift = rowNum % 8;
+ return ((flag >>> shift) & bitmap) == 0;
+ }
+
@Override
public boolean previous() throws SQLException {
throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
@@ -1172,7 +1213,7 @@ public class IoTDBQueryResultSet implements ResultSet {
}
private void checkRecord() throws SQLException {
- if (record == null) {
+ if (Objects.isNull(tsQueryDataSet)) {
throw new SQLException("No record remains");
}
}
@@ -1191,14 +1232,29 @@ public class IoTDBQueryResultSet implements ResultSet {
private String getValueByName(String columnName) throws SQLException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return String.valueOf(record.getTimestamp());
+ return String.valueOf(BytesUtils.bytesToLong(time));
}
int index = columnInfoMap.get(columnName) - START_INDEX;
- if (index < 0 || index >= record.getFields().size()) {
+ if (index < 0 || index >= values.length || values[index] == null) {
return null;
}
- Field field = record.getFields().get(index);
- return field.getDataType() == null ? null : field.getStringValue();
+ TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(index));
+ switch (dataType) {
+ case BOOLEAN:
+ return String.valueOf(BytesUtils.bytesToBool(values[index]));
+ case INT32:
+ return String.valueOf(BytesUtils.bytesToInt(values[index]));
+ case INT64:
+ return String.valueOf(BytesUtils.bytesToLong(values[index]));
+ case FLOAT:
+ return String.valueOf(BytesUtils.bytesToFloat(values[index]));
+ case DOUBLE:
+ return String.valueOf(BytesUtils.bytesToDouble(values[index]));
+ case TEXT:
+ return new String(values[index]);
+ default:
+ return null;
+ }
}
public boolean isIgnoreTimeStamp() {
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
index 3894e7e..7f186df 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
@@ -18,22 +18,9 @@
*/
package org.apache.iotdb.jdbc;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
/**
* Utils to convert between thrift format and TsFile format.
@@ -73,139 +60,4 @@ public class Utils {
return params;
}
- static ByteBuffer convertResultBuffer(TSQueryDataSet tsQueryDataSet,
- List<String> columnTypeList) throws IOException {
- int rowCount = tsQueryDataSet.getRowCount();
- ByteBuffer byteBuffer = tsQueryDataSet.bufferForValues();
- DataOutputStream[] dataOutputStreams = new DataOutputStream[rowCount];
- ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[rowCount];
-
- // process time buffer
- processTimestamp(byteBuffer, rowCount, dataOutputStreams, byteArrayOutputStreams);
- int valueOccupation = 0;
- for (String type : columnTypeList) {
- TSDataType dataType = TSDataType.valueOf(type);
- for (int i = 0; i < rowCount; i++) {
- boolean is_empty = BytesUtils.byteToBool(byteBuffer.get());
- if (is_empty) {
- dataOutputStreams[i].writeBoolean(true);
- } else {
- dataOutputStreams[i].writeBoolean(false);
- switch (dataType) {
- case BOOLEAN:
- boolean booleanValue = BytesUtils.byteToBool(byteBuffer.get());
- dataOutputStreams[i].writeBoolean(booleanValue);
- valueOccupation += 1;
- break;
- case INT32:
- int intValue = byteBuffer.getInt();
- dataOutputStreams[i].writeInt(intValue);
- valueOccupation += 4;
- break;
- case INT64:
- long longValue = byteBuffer.getLong();
- dataOutputStreams[i].writeLong(longValue);
- valueOccupation += 8;
- break;
- case FLOAT:
- float floatValue = byteBuffer.getFloat();
- dataOutputStreams[i].writeFloat(floatValue);
- valueOccupation += 4;
- break;
- case DOUBLE:
- double doubleValue = byteBuffer.getDouble();
- dataOutputStreams[i].writeDouble(doubleValue);
- valueOccupation += 8;
- break;
- case TEXT:
- int binarySize = byteBuffer.getInt();
- byte[] binaryValue = new byte[binarySize];
- byteBuffer.get(binaryValue);
- dataOutputStreams[i].writeInt(binarySize);
- dataOutputStreams[i].write(binaryValue);
- valueOccupation = valueOccupation + 4 + binaryValue.length;
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- }
- }
- }
- valueOccupation += rowCount * 8;
- valueOccupation += rowCount * columnTypeList.size();
- ByteBuffer resultBuffer = ByteBuffer.allocate(valueOccupation);
- for (int i = 0; i < rowCount; i++) {
- resultBuffer.put(byteArrayOutputStreams[i].toByteArray());
- }
- resultBuffer.flip(); // PAY ATTENTION TO HERE
-
- return resultBuffer;
- }
-
- private static void processTimestamp(ByteBuffer byteBuffer, int rowCount,
- DataOutputStream[] dataOutputStreams, ByteArrayOutputStream[] byteArrayOutputStreams)
- throws IOException {
- for (int i = 0; i < rowCount; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
- dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
- long timestamp = byteBuffer.getLong(); // byteBuffer has been flipped by the server side
- dataOutputStreams[i].writeLong(timestamp);
- }
- }
-
- public static RowRecord getRowRecord(ByteBuffer byteBuffer, List<String> columnTypeList)
- throws BufferUnderflowException {
- if (byteBuffer.hasRemaining()) {
- long timestamp = byteBuffer.getLong();
- RowRecord record = new RowRecord(timestamp);
- Field field = null;
- for (String type : columnTypeList) {
- boolean is_empty = BytesUtils.byteToBool(byteBuffer.get());
- if (is_empty) {
- field = new Field(null);
- } else {
- TSDataType dataType = TSDataType.valueOf(type);
- field = new Field(dataType);
- switch (dataType) {
- case BOOLEAN:
- boolean booleanValue = BytesUtils.byteToBool(byteBuffer.get());
- field.setBoolV(booleanValue);
- break;
- case INT32:
- int intValue = byteBuffer.getInt();
- field.setIntV(intValue);
- break;
- case INT64:
- long longValue = byteBuffer.getLong();
- field.setLongV(longValue);
- break;
- case FLOAT:
- float floatValue = byteBuffer.getFloat();
- field.setFloatV(floatValue);
- break;
- case DOUBLE:
- double doubleValue = byteBuffer.getDouble();
- field.setDoubleV(doubleValue);
- break;
- case TEXT:
- int binarySize = byteBuffer.getInt();
- byte[] binaryValue = new byte[binarySize];
- byteBuffer.get(binaryValue);
- field.setBinaryV(new Binary(binaryValue));
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- }
- record.getFields().add(field);
- }
- return record;
- }
- else {
- return null;
- }
- }
-
}
\ No newline at end of file
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
index 5331cc5..fffcc54 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.iotdb.jdbc;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -34,28 +37,11 @@ import java.sql.Statement;
import java.sql.Types;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
/*
This class is designed to test the function of TsfileQueryResultSet.
@@ -230,7 +216,6 @@ public class IoTDBQueryResultSetTest {
resultStr.append(resultSet.getString(i)).append(",");
}
resultStr.append("\n");
-
fetchResultsResp.hasResultSet = false; // at the second time to fetch
}
String standard =
@@ -269,12 +254,10 @@ public class IoTDBQueryResultSetTest {
{105L, 11.11F, 199L, 33333,},
{1000L, 1000.11F, 55555L, 22222,}};
- TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- int rowCount = input.length;
- tsQueryDataSet.setRowCount(rowCount);
-
int columnNum = tsDataTypeList.size();
- int columnNumWithTime = columnNum + 1;
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+ // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+ int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
@@ -282,43 +265,74 @@ public class IoTDBQueryResultSetTest {
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
- int valueOccupation = 0;
+ int rowCount = input.length;
+ int[] valueOccupation = new int[columnNum];
+ // used to record a bitmap for every 8 row record
+ int[] bitmap = new int[columnNum];
for (int i = 0; i < rowCount; i++) {
Object[] row = input[i];
// use columnOutput to write byte array
- dataOutputStreams[0].writeLong((long) row[0]);
+ dataOutputStreams[0].writeLong((long)row[0]);
for (int k = 0; k < columnNum; k++) {
- DataOutputStream dataOutputStream = dataOutputStreams[k + 1]; // DO NOT FORGET +1
Object value = row[1 + k];
+ DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; // DO NOT FORGET +1
if (value == null) {
- dataOutputStream.writeBoolean(true); // is_empty true
+ bitmap[k] = (bitmap[k] << 1);
} else {
- dataOutputStream.writeBoolean(false); // is_empty false
+ bitmap[k] = (bitmap[k] << 1) | 0x01;
if (k == 0) { // TSDataType.FLOAT
- dataOutputStream.writeFloat((float) value);
- valueOccupation += 4;
+ dataOutputStream.writeFloat((float)value);
+ valueOccupation[k] += 4;
} else if (k == 1) { // TSDataType.INT64
dataOutputStream.writeLong((long) value);
- valueOccupation += 8;
+ valueOccupation[k] += 8;
} else { // TSDataType.INT32
dataOutputStream.writeInt((int) value);
- valueOccupation += 4;
+ valueOccupation[k] += 4;
}
}
}
+ if (i % 8 == 7) {
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j]);
+ // we should clear the bitmap every 8 row record
+ bitmap[j] = 0;
+ }
+ }
}
- // calculate total valueOccupation
- valueOccupation += rowCount * 8; // note the timestamp column needn't the boolean is_empty
- valueOccupation += rowCount * columnNum; // for all is_empty
+ // feed the remaining bitmap
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j] << (8 - rowCount % 8));
+ }
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation);
- for (ByteArrayOutputStream byteArrayOutputStream : byteArrayOutputStreams) {
- valueBuffer.put(byteArrayOutputStream.toByteArray());
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = rowCount / 8 + 1;
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
+
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
}
- valueBuffer.flip(); // PAY ATTENTION TO HERE
- tsQueryDataSet.setValues(valueBuffer);
- tsQueryDataSet.setRowCount(rowCount);
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
return tsQueryDataSet;
}
}
\ No newline at end of file
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
index 8535995..34a0c3e 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
@@ -18,31 +18,19 @@
*/
package org.apache.iotdb.jdbc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class UtilsTest {
@Before
@@ -123,157 +111,4 @@ public class UtilsTest {
fail();
}
- @Test
- public void testConvertRowRecords() throws IOException {
- Object[][] input = {
- {100L, "sensor1_boolean", TSDataType.BOOLEAN, false,
- "sensor1_int32", TSDataType.INT32, 100,
- "sensor1_int64", TSDataType.INT64, 9999999999L,
- "sensor1_float", TSDataType.FLOAT, 1.23f,
- "sensor1_double", TSDataType.DOUBLE, 1004234.435d,
- "sensor1_text", TSDataType.TEXT, "iotdb-jdbc",},
- {200L, "sensor2_boolean", TSDataType.BOOLEAN, true,
- "sensor2_int32", null, null,
- "sensor2_int64", TSDataType.INT64, -9999999999L,
- "sensor2_float", null, null,
- "sensor2_double", TSDataType.DOUBLE, -1004234.435d,
- "sensor2_text", null, null,},
- {300L, "sensor3_boolean", null, null,
- "sensor3_int32", TSDataType.INT32, -100,
- "sensor3_int64", null, null,
- "sensor3_float", TSDataType.FLOAT, -1.23f,
- "sensor3_double", null, null,
- "sensor3_text", TSDataType.TEXT, "jdbc-iotdb",},};
-
- // create the TSQueryDataSet in the similar way as the server does
- TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- int rowCount = input.length;
- tsQueryDataSet.setRowCount(rowCount);
- int columnNum = 6;
- int columnNumWithTime = columnNum + 1;
- DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
- for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
- dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
- }
- int valueOccupation = 0;
- for (int i = 0; i < rowCount; i++) {
- Object[] row = input[i];
- // use columnOutput to write byte array
- dataOutputStreams[0].writeLong((long) row[0]);
- for (int k = 0; k < columnNum; k++) {
- DataOutputStream dataOutputStream = dataOutputStreams[k + 1]; // DO NOT FORGET +1
- Object type = row[1 + 3 * k + 1];
- Object value = row[1 + 3 * k + 2];
- if (type == null) {
- dataOutputStream.writeBoolean(true); // is_empty true
- } else {
- dataOutputStream.writeBoolean(false); // is_empty false
- TSDataType dataType = (TSDataType) type;
- switch (dataType) {
- case INT32:
- dataOutputStream.writeInt((int) value);
- valueOccupation += 4;
- break;
- case INT64:
- dataOutputStream.writeLong((long) value);
- valueOccupation += 8;
- break;
- case FLOAT:
- dataOutputStream.writeFloat((float) value);
- valueOccupation += 4;
- break;
- case DOUBLE:
- dataOutputStream.writeDouble((double) value);
- valueOccupation += 8;
- break;
- case BOOLEAN:
- dataOutputStream.writeBoolean((boolean) value);
- valueOccupation += 1;
- break;
- case TEXT:
- Binary binaryValue = new Binary((String) value);
- dataOutputStream.writeInt(binaryValue.getLength());
- dataOutputStream.write(binaryValue.getValues());
- valueOccupation = valueOccupation + 4 + binaryValue.getLength();
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- }
- }
- }
- // calculate total valueOccupation
- valueOccupation += rowCount * 8; // note the timestamp column needn't the boolean is_empty
- valueOccupation += rowCount * columnNum; // for all is_empty
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation);
- for (ByteArrayOutputStream byteArrayOutputStream : byteArrayOutputStreams) {
- valueBuffer.put(byteArrayOutputStream.toByteArray());
- }
- valueBuffer.flip(); // PAY ATTENTION TO HERE
- tsQueryDataSet.setValues(valueBuffer);
- tsQueryDataSet.setRowCount(rowCount);
-
- // begin convert
- List<String> columnTypeList = new ArrayList<>();
- columnTypeList.add(TSDataType.BOOLEAN.toString());
- columnTypeList.add(TSDataType.INT32.toString());
- columnTypeList.add(TSDataType.INT64.toString());
- columnTypeList.add(TSDataType.FLOAT.toString());
- columnTypeList.add(TSDataType.DOUBLE.toString());
- columnTypeList.add(TSDataType.TEXT.toString());
- ByteBuffer resultBuffer = Utils.convertResultBuffer(tsQueryDataSet, columnTypeList);
- RowRecord r = Utils.getRowRecord(resultBuffer, columnTypeList);
- int index = 0;
- while (r != null) {
- assertEquals(input[index][0], r.getTimestamp());
- List<Field> fields = r.getFields();
- int j = 0;
- for (Field f : fields) {
- if (j == 0) {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getBoolV());
- }
- } else if (j == 1) {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getIntV());
- }
- } else if (j == 2) {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getLongV());
- }
- } else if (j == 3) {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getFloatV());
- }
- } else if (j == 4) {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getDoubleV());
- }
- } else {
- if (input[index][3 * j + 3] == null) {
- assertTrue(f.getDataType() == null);
- } else {
- assertEquals(input[index][3 * j + 3], f.getStringValue());
- }
- }
- j++;
- }
- index++;
- r = Utils.getRowRecord(resultBuffer, columnTypeList);
- }
- }
-
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 881ae39..af471db 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,29 +18,6 @@
*/
package org.apache.iotdb.db.service;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ITEM;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PARAMETER;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
-import static org.apache.iotdb.db.conf.IoTDBConstant.VALUE;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -68,49 +45,14 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -123,6 +65,15 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
/**
* Thrift RPC implementation at server side.
*/
@@ -933,7 +884,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} else {
result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, req.fetchSize);
}
- boolean hasResultSet = (result.getRowCount() != 0);
+ boolean hasResultSet = (result.bufferForTime().limit() != 0);
if (!hasResultSet && queryId2DataSet.get() != null) {
queryId2DataSet.get().remove(req.queryId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index dda4073..8fa3f43 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,11 +18,6 @@
*/
package org.apache.iotdb.db.utils;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -33,11 +28,20 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* TimeValuePairUtils to convert between thrift format and TsFile format.
*/
public class QueryDataSetUtils {
+ private static final int flag = 0x01;
+
private QueryDataSetUtils() {
}
@@ -58,7 +62,8 @@ public class QueryDataSetUtils {
List<TSDataType> dataTypes = queryDataSet.getDataTypes();
int columnNum = dataTypes.size();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- int columnNumWithTime = columnNum + 1;
+ // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+ int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
@@ -67,10 +72,11 @@ public class QueryDataSetUtils {
}
int rowCount = 0;
- int valueOccupation = 0;
+ int[] valueOccupation = new int[columnNum];
+ // used to record a bitmap for every 8 row record
+ int[] bitmap = new int[columnNum];
for (int i = 0; i < fetchSize; i++) {
if (queryDataSet.hasNext()) {
- rowCount++;
RowRecord rowRecord = queryDataSet.next();
if (watermarkEncoder != null) {
rowRecord = watermarkEncoder.encodeRecord(rowRecord);
@@ -80,37 +86,37 @@ public class QueryDataSetUtils {
List<Field> fields = rowRecord.getFields();
for (int k = 0; k < fields.size(); k++) {
Field field = fields.get(k);
- DataOutputStream dataOutputStream = dataOutputStreams[k + 1]; // DO NOT FORGET +1
+ DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; // DO NOT FORGET +1
if (field.getDataType() == null) {
- dataOutputStream.writeBoolean(true); // is_empty true
+ bitmap[k] = (bitmap[k] << 1);
} else {
- dataOutputStream.writeBoolean(false); // is_empty false
+ bitmap[k] = (bitmap[k] << 1) | flag;
TSDataType type = field.getDataType();
switch (type) {
case INT32:
dataOutputStream.writeInt(field.getIntV());
- valueOccupation += 4;
+ valueOccupation[k] += 4;
break;
case INT64:
dataOutputStream.writeLong(field.getLongV());
- valueOccupation += 8;
+ valueOccupation[k] += 8;
break;
case FLOAT:
dataOutputStream.writeFloat(field.getFloatV());
- valueOccupation += 4;
+ valueOccupation[k] += 4;
break;
case DOUBLE:
dataOutputStream.writeDouble(field.getDoubleV());
- valueOccupation += 8;
+ valueOccupation[k] += 8;
break;
case BOOLEAN:
dataOutputStream.writeBoolean(field.getBoolV());
- valueOccupation += 1;
+ valueOccupation[k] += 1;
break;
case TEXT:
dataOutputStream.writeInt(field.getBinaryV().getLength());
dataOutputStream.write(field.getBinaryV().getValues());
- valueOccupation = valueOccupation + 4 + field.getBinaryV().getLength();
+ valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength();
break;
default:
throw new UnSupportedDataTypeException(
@@ -118,22 +124,54 @@ public class QueryDataSetUtils {
}
}
}
+ rowCount++;
+ if (rowCount % 8 == 0) {
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j]);
+ // we should clear the bitmap every 8 row record
+ bitmap[j] = 0;
+ }
+ }
} else {
break;
}
}
- // calculate total valueOccupation
- valueOccupation += rowCount * 8; // note the timestamp column needn't the boolean is_empty
- valueOccupation += rowCount * dataTypes.size(); // for all is_empty
+ // feed the remaining bitmap
+ int remaining = rowCount % 8;
+ if (remaining != 0) {
+ for (int j = 0; j < bitmap.length; j++) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
+ dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+ }
+ }
+
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation);
- for (ByteArrayOutputStream byteArrayOutputStream : byteArrayOutputStreams) {
- valueBuffer.put(byteArrayOutputStream.toByteArray());
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
}
- valueBuffer.flip(); // PAY ATTENTION TO HERE
- tsQueryDataSet.setValues(valueBuffer);
- tsQueryDataSet.setRowCount(rowCount);
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
return tsQueryDataSet;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
index bd88233..ed0422b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
@@ -18,22 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.apache.iotdb.db.integration.Constant.TIMESTAMP_STR;
-import static org.apache.iotdb.db.integration.Constant.d0s0;
-import static org.apache.iotdb.db.integration.Constant.d0s1;
-import static org.apache.iotdb.db.integration.Constant.d0s2;
-import static org.apache.iotdb.db.integration.Constant.d0s3;
-import static org.apache.iotdb.db.integration.Constant.d0s4;
-import static org.apache.iotdb.db.integration.Constant.d1s0;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -42,6 +26,12 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.sql.*;
+
+import static org.apache.iotdb.db.integration.Constant.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the
* IoTDB server should be defined as integration test.
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
index 5303558..9d3273b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
@@ -18,18 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -43,6 +31,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
public class IoTDBLoadExternalTsfileTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileTest.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
index 341cf51..ebba549 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -35,6 +26,10 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
public class IoTDBQueryDemoIT {
private static IoTDB daemon;
@@ -169,7 +164,6 @@ public class IoTDBQueryDemoIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
header.append(resultSetMetaData.getColumnName(i)).append(",");
}
- System.out.println(header.toString());
Assert.assertEquals(
"Time,root.ln.wf01.wt01.status,root.ln.wf01.wt01.temperature,"
+ "root.ln.wf02.wt02.hardware,root.ln.wf02.wt02.status,root.sgcc.wf03.wt01.status,"
@@ -188,7 +182,6 @@ public class IoTDBQueryDemoIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
- System.out.println(builder.toString());
Assert.assertEquals(retArray[cnt], builder.toString());
cnt++;
}
@@ -226,7 +219,6 @@ public class IoTDBQueryDemoIT {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
header.append(resultSetMetaData.getColumnName(i)).append(",");
}
- System.out.println(header.toString());
Assert.assertEquals(
"Time,root.ln.wf01.wt01.status,root.ln.wf01.wt01.temperature,"
+ "root.ln.wf02.wt02.hardware,root.ln.wf02.wt02.status,root.sgcc.wf03.wt01.status,"
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 0b0194b..43c8a75 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -80,6 +80,7 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add optional set\<string> childPaths in TSFetchMetadataResp | Haonan Hou |
| Add optional string version in TSFetchMetadataResp | Genius_pig |
| Add required i64 statementId in TSExecuteStatementReq | Yuan Tian |
+| Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index bf716a3..4384309 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -166,8 +166,12 @@ struct TSCloseOperationReq {
}
struct TSQueryDataSet{
- 1: required binary values
- 2: required i32 rowCount
+ // ByteBuffer for time column
+ 1: required binary time
+ // ByteBuffer for each column values
+ 2: required list<binary> valueList
+ // Bitmap for each column to indicate whether it is a null value
+ 3: required list<binary> bitmapList
}
struct TSFetchResultsReq{
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index fd055d3..6c2ac10 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -18,36 +18,41 @@
*/
package org.apache.iotdb.session;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.thrift.TException;
+
+import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.iotdb.rpc.IoTDBRPCException;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.thrift.TException;
public class SessionDataSet {
private boolean getFlag = false;
private String sql;
private long queryId;
- private RowRecord record;
- private Iterator<RowRecord> recordItr;
- private TSIService.Iface client = null;
+ private TSIService.Iface client;
private TSOperationHandle operationHandle;
private int batchSize = 512;
private List<String> columnTypeDeduplicatedList;
+ private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
+ private TSQueryDataSet tsQueryDataSet = null;
+ private RowRecord rowRecord = null;
+ private byte[] currentBitmap; // used to cache the current bitmap for every column
+ private static final int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
+
+
public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
long queryId, TSIService.Iface client, TSOperationHandle operationHandle) {
this.sql = sql;
@@ -76,49 +81,107 @@ public class SessionDataSet {
}
public boolean hasNext() throws SQLException, IoTDBRPCException {
- return getFlag || nextWithoutConstraints(sql, queryId);
- }
-
- public RowRecord next() throws SQLException, IoTDBRPCException {
- if (!getFlag) {
- nextWithoutConstraints(sql, queryId);
- }
-
- getFlag = false;
- return record;
- }
-
-
- private boolean nextWithoutConstraints(String sql, long queryId)
- throws SQLException, IoTDBRPCException {
- if ((recordItr == null || !recordItr.hasNext())) {
+ if (getFlag)
+ return true;
+ if (tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) {
TSFetchResultsReq req = new TSFetchResultsReq(sql, batchSize, queryId);
-
try {
TSFetchResultsResp resp = client.fetchResults(req);
-
RpcUtils.verifySuccess(resp.getStatus());
if (!resp.hasResultSet) {
return false;
} else {
- TSQueryDataSet tsQueryDataSet = resp.getQueryDataSet();
- List<RowRecord> records = SessionUtils
- .convertRowRecords(tsQueryDataSet, columnTypeDeduplicatedList);
- recordItr = records.iterator();
+ tsQueryDataSet = resp.getQueryDataSet();
+ rowsIndex = 0;
}
} catch (TException e) {
throw new SQLException(
- "Cannot fetch result from server, because of network connection : {} ", e);
+ "Cannot fetch result from server, because of network connection: {} ", e);
}
}
- record = recordItr.next();
+ constructOneRow();
getFlag = true;
return true;
}
+ private void constructOneRow() {
+ rowRecord = new RowRecord(tsQueryDataSet.time.getLong());
+
+ for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+ ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+ // another new 8 row, should move the bitmap buffer position to next byte
+ if (rowsIndex % 8 == 0) {
+ currentBitmap[i] = bitmapBuffer.get();
+ }
+ Field field;
+ if (!isNull(i, rowsIndex)) {
+ ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+ TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
+ field = new Field(dataType);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean booleanValue = BytesUtils.byteToBool(valueBuffer.get());
+ field.setBoolV(booleanValue);
+ break;
+ case INT32:
+ int intValue = valueBuffer.getInt();
+ field.setIntV(intValue);
+ break;
+ case INT64:
+ long longValue = valueBuffer.getLong();
+ field.setLongV(longValue);
+ break;
+ case FLOAT:
+ float floatValue = valueBuffer.getFloat();
+ field.setFloatV(floatValue);
+ break;
+ case DOUBLE:
+ double doubleValue = valueBuffer.getDouble();
+ field.setDoubleV(doubleValue);
+ break;
+ case TEXT:
+ int binarySize = valueBuffer.getInt();
+ byte[] binaryValue = new byte[binarySize];
+ valueBuffer.get(binaryValue);
+ field.setBinaryV(new Binary(binaryValue));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+ }
+ }
+ else {
+ field = new Field(null);
+ }
+ rowRecord.addField(field);
+ }
+ rowsIndex++;
+ }
+
+ /**
+ * judge whether the specified column value is null in the current position
+ * @param index column index
+ * @return
+ */
+ private boolean isNull(int index, int rowNum) {
+ byte bitmap = currentBitmap[index];
+ int shift = rowNum % 8;
+ return ((flag >>> shift) & bitmap) == 0;
+ }
+
+ public RowRecord next() throws SQLException, IoTDBRPCException {
+ if (!getFlag) {
+ if (!hasNext())
+ return null;
+ }
+
+ getFlag = false;
+ return rowRecord;
+ }
+
public void closeOperationHandle() throws SQLException {
try {
if (operationHandle != null) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
index c53c7f0..99b4532 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
@@ -18,18 +18,14 @@
*/
package org.apache.iotdb.session;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.write.record.RowBatch;
+import java.nio.ByteBuffer;
+
public class SessionUtils {
public static ByteBuffer getTimeBuffer(RowBatch rowBatch) {
@@ -91,74 +87,4 @@ public class SessionUtils {
valueBuffer.flip();
return valueBuffer;
}
-
-
- /**
- * convert row records.
- */
- static List<RowRecord> convertRowRecords(TSQueryDataSet tsQueryDataSet,
- List<String> columnTypeList) {
- int rowCount = tsQueryDataSet.getRowCount();
- ByteBuffer byteBuffer = tsQueryDataSet.bufferForValues();
-
- // process time buffer
- List<RowRecord> rowRecordList = processTimeAndCreateRowRecords(byteBuffer, rowCount);
-
- for (String type : columnTypeList) {
- for (int i = 0; i < rowCount; i++) {
- Field field = null;
- boolean is_empty = BytesUtils.byteToBool(byteBuffer.get());
- if (is_empty) {
- field = new Field(null);
- } else {
- TSDataType dataType = TSDataType.valueOf(type);
- field = new Field(dataType);
- switch (dataType) {
- case BOOLEAN:
- boolean booleanValue = BytesUtils.byteToBool(byteBuffer.get());
- field.setBoolV(booleanValue);
- break;
- case INT32:
- int intValue = byteBuffer.getInt();
- field.setIntV(intValue);
- break;
- case INT64:
- long longValue = byteBuffer.getLong();
- field.setLongV(longValue);
- break;
- case FLOAT:
- float floatValue = byteBuffer.getFloat();
- field.setFloatV(floatValue);
- break;
- case DOUBLE:
- double doubleValue = byteBuffer.getDouble();
- field.setDoubleV(doubleValue);
- break;
- case TEXT:
- int binarySize = byteBuffer.getInt();
- byte[] binaryValue = new byte[binarySize];
- byteBuffer.get(binaryValue);
- field.setBinaryV(new Binary(binaryValue));
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- }
- rowRecordList.get(i).getFields().add(field);
- }
- }
- return rowRecordList;
- }
-
- private static List<RowRecord> processTimeAndCreateRowRecords(ByteBuffer byteBuffer,
- int rowCount) {
- List<RowRecord> rowRecordList = new ArrayList<>();
- for (int i = 0; i < rowCount; i++) {
- long timestamp = byteBuffer.getLong(); // byteBuffer has been flipped by the server side
- RowRecord rowRecord = new RowRecord(timestamp);
- rowRecordList.add(rowRecord);
- }
- return rowRecordList;
- }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 2669381..6029372 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -18,18 +18,6 @@
*/
package org.apache.iotdb.session;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -50,6 +38,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
public class IoTDBSessionIT {
private IoTDB daemon;