You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/01/15 05:47:52 UTC
[incubator-iotdb] branch master updated: [IOTDB-396] New query
clause: disable align (#738)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08346cc [IOTDB-396] New query clause: disable align (#738)
08346cc is described below
commit 08346cc47911ba3430f904a1bdf8979f9f150ada
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Wed Jan 15 13:47:47 2020 +0800
[IOTDB-396] New query clause: disable align (#738)
* disable align clause
Co-authored-by: Haonan <hh...@outlook.com>
---
.travis.yml | 2 +-
.../org/apache/iotdb/client/AbstractClient.java | 183 ++-
.../5-Operation Manual/4-SQL Reference.md | 38 +
.../5-Operation Manual/4-SQL Reference.md | 37 +
...yResultSet.java => AbstractIoTDBResultSet.java} | 254 +---
.../iotdb/jdbc/IoTDBNonAlignQueryResultSet.java | 209 ++++
.../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 1303 ++------------------
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 26 +-
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 28 +-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 9 +
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 2 +-
.../NewEngineDataSetWithoutValueFilter.java | 2 +-
.../db/query/dataset/NonAlignEngineDataSet.java | 353 ++++++
.../iotdb/db/query/executor/EngineExecutor.java | 23 +
.../iotdb/db/query/executor/EngineQueryRouter.java | 15 +-
.../org/apache/iotdb/db/rescon/MemTablePool.java | 5 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 275 +++--
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 1 -
.../iotdb/db/integration/IoTDBDisableAlignIT.java | 408 ++++++
service-rpc/rpc-changelist.md | 4 +-
service-rpc/src/main/thrift/rpc.thrift | 26 +-
session/pom.xml | 256 ++--
.../org/apache/iotdb/session/SessionDataSet.java | 2 +-
23 files changed, 1726 insertions(+), 1735 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 9d590e1..93ce19d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -175,7 +175,7 @@ matrix:
# - mvn -B apache-rat:check
# - mvn -B clean package -pl server,grafana,client,example,:kafka-example,:rocketmq-example -am integration-test
- os: linux
- if: fork = false #only fork=true (i.e., the committer has permission to write the repo)
+# if: fork = false #only fork=true (i.e., the committer has permission to write the repo)
name: sonar-analysis
dist: xenial
jdk: openjdk8
diff --git a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
index 23c2440..96f29f2 100644
--- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
@@ -18,21 +18,6 @@
*/
package org.apache.iotdb.client;
-import java.io.PrintStream;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -45,6 +30,18 @@ import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.tool.ImportCsv;
import org.apache.thrift.TException;
+import java.io.PrintStream;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
public abstract class AbstractClient {
static final String HOST_ARGS = "h";
@@ -99,6 +96,9 @@ public abstract class AbstractClient {
private static boolean isQuit = false;
static String TIMESTAMP_PRECISION = "ms";
+ private static final int START_PRINT_INDEX = 2;
+ private static final int NO_ALIGN_PRINT_INTERVAL = 2;
+
/**
* control the width of columns for 'show timeseries path' and 'show storage group'.
* <p>
@@ -189,16 +189,23 @@ public abstract class AbstractClient {
throws SQLException {
int cnt = 0;
boolean printTimestamp = true;
+ boolean align = true;
displayCnt = 0;
printHeader = false;
ResultSetMetaData resultSetMetaData = res.getMetaData();
int colCount = resultSetMetaData.getColumnCount();
- printTimestamp = !((IoTDBQueryResultSet) res).isIgnoreTimeStamp();
+
+ if (res instanceof IoTDBQueryResultSet) {
+ printTimestamp = !((IoTDBQueryResultSet) res).isIgnoreTimeStamp();
+ }
+ else {
+ align = false;
+ }
// Output values
while (cnt < maxPrintRowCount && res.next()) {
- printRow(printTimestamp, colCount, resultSetMetaData, res, zoneId);
+ printRow(printTimestamp, align, colCount, resultSetMetaData, res, zoneId);
cnt++;
if (!printToConsole && cnt % 10000 == 0) {
println(cnt);
@@ -207,11 +214,11 @@ public abstract class AbstractClient {
if (printToConsole) {
if (!printHeader) {
- printBlockLine(printTimestamp, colCount, resultSetMetaData);
- printName(printTimestamp, colCount, resultSetMetaData);
- printBlockLine(printTimestamp, colCount, resultSetMetaData);
+ printBlockLine(printTimestamp, align, colCount, resultSetMetaData);
+ printName(printTimestamp, align, colCount, resultSetMetaData);
+ printBlockLine(printTimestamp, align, colCount, resultSetMetaData);
} else {
- printBlockLine(printTimestamp, colCount, resultSetMetaData);
+ printBlockLine(printTimestamp, align, colCount, resultSetMetaData);
}
}
@@ -231,23 +238,23 @@ public abstract class AbstractClient {
println("Total line number = " + cnt);
}
- private static void printRow(boolean printTimestamp, int colCount,
+ private static void printRow(boolean printTimestamp, boolean align, int colCount,
ResultSetMetaData resultSetMetaData, ResultSet res, ZoneId zoneId)
throws SQLException {
// Output Labels
if (!printToConsole) {
return;
}
- printHeader(printTimestamp, colCount, resultSetMetaData);
- printRowData(printTimestamp, res, zoneId, resultSetMetaData, colCount);
+ printHeader(printTimestamp, align, colCount, resultSetMetaData);
+ printRowData(printTimestamp, align, res, zoneId, resultSetMetaData, colCount);
}
- private static void printHeader(boolean printTimestamp, int colCount,
+ private static void printHeader(boolean printTimestamp, boolean align, int colCount,
ResultSetMetaData resultSetMetaData) throws SQLException {
if (!printHeader) {
- printBlockLine(printTimestamp, colCount, resultSetMetaData);
- printName(printTimestamp, colCount, resultSetMetaData);
- printBlockLine(printTimestamp, colCount, resultSetMetaData);
+ printBlockLine(printTimestamp, align, colCount, resultSetMetaData);
+ printName(printTimestamp, align, colCount, resultSetMetaData);
+ printBlockLine(printTimestamp, align, colCount, resultSetMetaData);
printHeader = true;
}
}
@@ -261,19 +268,36 @@ public abstract class AbstractClient {
println();
}
- private static void printRowData(boolean printTimestamp, ResultSet res, ZoneId zoneId,
+ private static void printRowData(boolean printTimestamp, boolean align, ResultSet res, ZoneId zoneId,
ResultSetMetaData resultSetMetaData, int colCount)
throws SQLException {
if (displayCnt < maxPrintRowCount) { // NOTE displayCnt only works on queried data results
print("|");
- if (printTimestamp) {
- printf(formatTime, formatDatetime(res.getLong(TIMESTAMP_STR), zoneId));
- for (int i = 2; i <= colCount; i++) {
- printColumnData(resultSetMetaData, res, i, zoneId);
+ if (align) {
+ if (printTimestamp) {
+ printf(formatTime, formatDatetime(res.getLong(TIMESTAMP_STR), zoneId));
+ for (int i = 2; i <= colCount; i++) {
+ printColumnData(resultSetMetaData, true, res, i, zoneId);
+ }
+ } else {
+ for (int i = 1; i <= colCount; i++) {
+ printColumnData(resultSetMetaData, true, res, i, zoneId);
+ }
}
- } else {
- for (int i = 1; i <= colCount; i++) {
- printf(formatValue, res.getString(i));
+ }
+ else {
+ for (int i = START_PRINT_INDEX; i <= colCount / NO_ALIGN_PRINT_INTERVAL + 1; i++) {
+ if (printTimestamp) {
+ // timeLabel used for indicating the time column.
+ String timeLabel = TIMESTAMP_STR + resultSetMetaData.getColumnLabel(NO_ALIGN_PRINT_INTERVAL * i - START_PRINT_INDEX);
+ try {
+ printf(formatTime, formatDatetime(res.getLong(timeLabel), zoneId));
+ } catch (Exception e) {
+ printf(formatTime, "null");
+ handleException(e);
+ }
+ }
+ printColumnData(resultSetMetaData, false, res, i, zoneId);
}
}
println();
@@ -281,8 +305,8 @@ public abstract class AbstractClient {
}
}
- private static void printColumnData(ResultSetMetaData resultSetMetaData, ResultSet res, int i,
- ZoneId zoneId) throws SQLException {
+ private static void printColumnData(ResultSetMetaData resultSetMetaData, boolean align,
+ ResultSet res, int i, ZoneId zoneId) throws SQLException {
boolean flag = false;
for (String timeStr : AGGREGRATE_TIME_LIST) {
if (resultSetMetaData.getColumnLabel(i).toUpperCase().contains(timeStr.toUpperCase())) {
@@ -297,13 +321,24 @@ public abstract class AbstractClient {
printf(formatValue, "null");
handleException(e);
}
- } else {
+ }
+ else if (align) {
if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
printf("%" + deviceColumnLength + "s|", res.getString(i));
} else {
printf(formatValue, res.getString(i));
}
}
+ // for disable align clause
+ else {
+ if (res.getString(i * NO_ALIGN_PRINT_INTERVAL - START_PRINT_INDEX) == null) {
+ //blank space
+ printf(formatValue, "");
+ }
+ else {
+ printf(formatValue, res.getString(i * NO_ALIGN_PRINT_INTERVAL - START_PRINT_INDEX));
+ }
+ }
}
static Options createOptions() {
@@ -470,11 +505,15 @@ public abstract class AbstractClient {
}
}
- private static void printBlockLine(boolean printTimestamp, int colCount,
+ private static void printBlockLine(boolean printTimestamp, boolean align, int colCount,
ResultSetMetaData resultSetMetaData) throws SQLException {
StringBuilder blockLine = new StringBuilder();
- if (printTimestamp) {
- blockLine.append("+").append(StringUtils.repeat('-', maxTimeLength)).append("+");
+ if (align) {
+ if (printTimestamp) {
+ blockLine.append("+").append(StringUtils.repeat('-', maxTimeLength)).append("+");
+ } else {
+ blockLine.append("+");
+ }
if (resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
maxValueLength = measurementColumnLength;
} else {
@@ -485,37 +524,65 @@ public abstract class AbstractClient {
}
maxValueLength = tmp;
}
- for (int i = 2; i <= colCount; i++) {
- if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
- blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
- } else {
- blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
+ if (printTimestamp) {
+ for (int i = 2; i <= colCount; i++) {
+ if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
+ blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
+ } else {
+ blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
+ }
+ }
+ } else {
+ for (int i = 1; i <= colCount; i++) {
+ blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
}
}
- } else {
- blockLine.append("+");
+ }
+ // for disable align clause
+ else {
+ int tmp = Integer.MIN_VALUE;
for (int i = 1; i <= colCount; i++) {
+ int len = resultSetMetaData.getColumnLabel(i).length();
+ tmp = Math.max(tmp, len);
+ }
+ maxValueLength = tmp;
+ blockLine.append("+");
+ for (int i = 2; i <= colCount / 2 + 1; i++) {
+ if (printTimestamp) {
+ blockLine.append(StringUtils.repeat('-', maxTimeLength)).append("+");
+ }
blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
}
}
println(blockLine);
}
- private static void printName(boolean printTimestamp, int colCount,
+ private static void printName(boolean printTimestamp, boolean align, int colCount,
ResultSetMetaData resultSetMetaData) throws SQLException {
print("|");
formatValue = "%" + maxValueLength + "s|";
- if (printTimestamp) {
- printf(formatTime, TIMESTAMP_STR);
- for (int i = 2; i <= colCount; i++) {
- if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
- printf("%" + deviceColumnLength + "s|", resultSetMetaData.getColumnLabel(i));
- } else {
+ if (align) {
+ if (printTimestamp) {
+ printf(formatTime, TIMESTAMP_STR);
+ for (int i = 2; i <= colCount; i++) {
+ if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
+ printf("%" + deviceColumnLength + "s|", resultSetMetaData.getColumnLabel(i));
+ } else {
+ printf(formatValue, resultSetMetaData.getColumnLabel(i));
+ }
+ }
+ } else {
+ for (int i = 1; i <= colCount; i++) {
printf(formatValue, resultSetMetaData.getColumnLabel(i));
}
}
- } else {
- for (int i = 1; i <= colCount; i++) {
+ }
+ // for disable align
+ else {
+ for (int i = 2; i <= colCount; i += 2) {
+ if (printTimestamp) {
+ printf(formatTime, TIMESTAMP_STR);
+ }
printf(formatValue, resultSetMetaData.getColumnLabel(i));
}
}
diff --git a/docs/Documentation-CHN/UserGuide/5-Operation Manual/4-SQL Reference.md b/docs/Documentation-CHN/UserGuide/5-Operation Manual/4-SQL Reference.md
index b8ef160..cec69a9 100644
--- a/docs/Documentation-CHN/UserGuide/5-Operation Manual/4-SQL Reference.md
+++ b/docs/Documentation-CHN/UserGuide/5-Operation Manual/4-SQL Reference.md
@@ -323,6 +323,44 @@ Note: The order of <LIMITClause> and <SLIMITClause> does not affect the grammati
Note: <FillClause> can not use <LIMITClause> but not <SLIMITClause>.
```
+* Disable align语句
+```
+规则:
+1. 大小写均可.
+Correct example: select * from root.sg1 disable align
+Correct example: select * from root.sg1 DISABLE ALIGN
+
+2. Disable Align只能用于查询语句句尾.
+Correct example: select * from root.sg1 where time > 10 disable align
+Wrong example: select * from root.sg1 disable align where time > 10
+
+3. Disable Align 不能用于聚合查询、Fill语句、Group by或Group by device语句,但可用于Limit语句。
+Correct example: select * from root.sg1 limit 3 offset 2 disable align
+Correct example: select * from root.sg1 slimit 3 soffset 2 disable align
+Wrong example: select count(s0),count(s1) from root.sg1.d1 disable align
+Wrong example: select * from root.vehicle where root.vehicle.d0.s0>0 disable align
+Wrong example: select * from root.vehicle group by device disable align
+
+4. 结果显示若无数据显示为空白.
+
+查询结果样式如下表:
+| Time | root.sg.d0.s1 | Time | root.sg.d0.s2 | Time | root.sg.d1.s1 |
+| --- | --- | --- | --- | --- | --- |
+| 1 | 100 | 20 | 300 | 400 | 600 |
+| 2 | 300 | 40 | 800 | 700 | 900 |
+| 4 | 500 | | | 800 | 1000 |
+| | | | | 900 | 8000 |
+
+5. 一些正确使用样例:
+ - select * from root.vehicle disable align
+ - select s0,s0,s1 from root.vehicle.* disable align
+ - select s0,s1 from root.vehicle.* limit 10 offset 1 disable align
+ - select * from root.vehicle slimit 10 soffset 2 disable align
+ - select * from root.vehicle where time > 10 disable align
+
+
+```
+
### 数据库管理语句
* 创建用户
diff --git a/docs/Documentation/UserGuide/5-Operation Manual/4-SQL Reference.md b/docs/Documentation/UserGuide/5-Operation Manual/4-SQL Reference.md
index 985551c..cb1b49d 100644
--- a/docs/Documentation/UserGuide/5-Operation Manual/4-SQL Reference.md
+++ b/docs/Documentation/UserGuide/5-Operation Manual/4-SQL Reference.md
@@ -404,7 +404,44 @@ For example, "select s0,s0,s1 from root.sg.* group by device" is not equal to "s
- select sum(*) from root.vehicle GROUP BY (20ms,0,[2,50]) group by device
- select * from root.vehicle where time = 3 Fill(int32[previous, 5ms]) group by device
```
+* Disable Align Statement
+```
+Disable Align Clause: DISABLE ALIGN
+Rules:
+1. Both uppercase and lowercase are ok.
+Correct example: select * from root.sg1 disable align
+Correct example: select * from root.sg1 DISABLE ALIGN
+
+2. Disable Align Clause can only be used at the end of a query statement.
+Correct example: select * from root.sg1 where time > 10 disable align
+Wrong example: select * from root.sg1 disable align where time > 10
+
+3. Disable Align Clause cannot be used with Aggregation, Fill Statements, Group By or Group By Device Statements, but can with Limit Statements.
+Correct example: select * from root.sg1 limit 3 offset 2 disable align
+Correct example: select * from root.sg1 slimit 3 soffset 2 disable align
+Wrong example: select count(s0),count(s1) from root.sg1.d1 disable align
+Wrong example: select * from root.vehicle where root.vehicle.d0.s0>0 disable align
+Wrong example: select * from root.vehicle group by device disable align
+
+4. The display principle of the result table is that only when the column (or row) has existing data will the column (or row) be shown, with nonexistent cells being empty.
+
+You could expect a table like:
+| Time | root.sg.d0.s1 | Time | root.sg.d0.s2 | Time | root.sg.d1.s1 |
+| --- | --- | --- | --- | --- | --- |
+| 1 | 100 | 20 | 300 | 400 | 600 |
+| 2 | 300 | 40 | 800 | 700 | 900 |
+| 4 | 500 | | | 800 | 1000 |
+| | | | | 900 | 8000 |
+
+5. More correct examples:
+ - select * from root.vehicle disable align
+ - select s0,s0,s1 from root.vehicle.* disable align
+ - select s0,s1 from root.vehicle.* limit 10 offset 1 disable align
+ - select * from root.vehicle slimit 10 soffset 2 disable align
+ - select * from root.vehicle where time > 10 disable align
+
+```
### Database Management Statement
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java
similarity index 80%
copy from jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
copy to jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java
index 3a478c7..b1c9e0e 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.*;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import java.io.InputStream;
@@ -33,80 +33,68 @@ import java.io.Reader;
import java.math.BigDecimal;
import java.math.MathContext;
import java.net.URL;
-import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.*;
import java.util.*;
-public class IoTDBQueryResultSet implements ResultSet {
-
- private static final String TIMESTAMP_STR = "Time";
- private static final int START_INDEX = 2;
- private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
- private Statement statement = null;
- private String sql;
- private SQLWarning warningChain = null;
- private boolean isClosed = false;
- private TSIService.Iface client = null;
- private List<String> columnInfoList; // no deduplication
- private List<String> columnTypeList; // no deduplication
- private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
- private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
- private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
- private int fetchSize;
- private boolean emptyResultSet = false;
-
- private TSQueryDataSet tsQueryDataSet = null;
- private byte[] time; // used to cache the current time value
- private byte[][] values; // used to cache the current row record value
- private byte[] currentBitmap; // used to cache the current bitmap for every column
- private static final int FLAG = 0x80; // used to do `and` operation with bitmap to judge whether the value is null
-
- private long sessionId;
- private long queryId;
- private boolean ignoreTimeStamp = false;
-
- public IoTDBQueryResultSet() {
- // do nothing
- }
-
- public IoTDBQueryResultSet(Statement statement, List<String> columnNameList,
- List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
- String sql, long queryId, long sessionId, TSQueryDataSet dataset)
- throws SQLException {
+public abstract class AbstractIoTDBResultSet implements ResultSet {
+
+ protected static final String TIMESTAMP_STR = "Time";
+ protected static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+ protected static final int START_INDEX = 2;
+ protected Statement statement;
+ protected String sql;
+ protected SQLWarning warningChain = null;
+ protected boolean isClosed = false;
+ protected TSIService.Iface client;
+ protected List<String> columnNameList; // no deduplication
+ protected List<String> columnTypeList; // no deduplication
+ protected Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns
+ protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
+ protected int fetchSize;
+ protected boolean emptyResultSet = false;
+
+
+ protected byte[][] values; // used to cache the current row record value
+
+
+ protected long sessionId;
+ protected long queryId;
+ protected boolean ignoreTimeStamp;
+
+
+ public AbstractIoTDBResultSet(Statement statement, List<String> columnNameList,
+ List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+ String sql, long queryId, long sessionId)
+ throws SQLException {
this.statement = statement;
this.fetchSize = statement.getFetchSize();
this.columnTypeList = columnTypeList;
-
- time = new byte[Long.BYTES];
values = new byte[columnNameList.size()][];
- currentBitmap = new byte[columnNameList.size()];
- this.columnInfoList = new ArrayList<>();
+ this.columnNameList = new ArrayList<>();
if(!ignoreTimeStamp) {
- this.columnInfoList.add(TIMESTAMP_STR);
+ this.columnNameList.add(TIMESTAMP_STR);
}
// deduplicate and map
- this.columnInfoMap = new HashMap<>();
+ this.columnOrdinalMap = new HashMap<>();
if(!ignoreTimeStamp) {
- this.columnInfoMap.put(TIMESTAMP_STR, 1);
+ this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
}
this.columnTypeDeduplicatedList = new ArrayList<>();
int index = START_INDEX;
for (int i = 0; i < columnNameList.size(); i++) {
String name = columnNameList.get(i);
- columnInfoList.add(name);
- if (!columnInfoMap.containsKey(name)) {
- columnInfoMap.put(name, index++);
- columnTypeDeduplicatedList.add(columnTypeList.get(i));
+ this.columnNameList.add(name);
+ if (!columnOrdinalMap.containsKey(name)) {
+ columnOrdinalMap.put(name, index++);
+ columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
}
}
-
this.ignoreTimeStamp = ignoreTimeStamp;
this.client = client;
this.sql = sql;
this.queryId = queryId;
- this.tsQueryDataSet = dataset;
this.sessionId = sessionId;
}
@@ -157,10 +145,9 @@ public class IoTDBQueryResultSet implements ResultSet {
TSStatus closeResp = client.closeOperation(closeReq);
RpcUtils.verifySuccess(closeResp);
} catch (IoTDBRPCException e) {
- throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
+ throw new SQLException("Error occurs for close opeation in server side becasuse ", e);
} catch (TException e) {
- throw new SQLException(
- "Error occurs when connecting to server for close operation, becasue: " + e);
+ throw new SQLException("Error occurs when connecting to server for close operation ", e);
}
}
client = null;
@@ -175,7 +162,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public int findColumn(String columnName) {
- return columnInfoMap.get(columnName);
+ return columnOrdinalMap.get(columnName);
}
@Override
@@ -252,7 +239,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public boolean getBoolean(String columnName) throws SQLException {
checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToBool(values[index]);
}
@@ -339,7 +326,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public double getDouble(String columnName) throws SQLException {
checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToDouble(values[index]);
} else {
@@ -375,7 +362,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public float getFloat(String columnName) throws SQLException {
checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToFloat(values[index]);
} else {
@@ -396,7 +383,7 @@ public class IoTDBQueryResultSet implements ResultSet {
@Override
public int getInt(String columnName) throws SQLException {
checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToInt(values[index]);
} else {
@@ -410,22 +397,11 @@ public class IoTDBQueryResultSet implements ResultSet {
}
@Override
- public long getLong(String columnName) throws SQLException {
- checkRecord();
- if (columnName.equals(TIMESTAMP_STR)) {
- return BytesUtils.bytesToLong(time);
- }
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (values[index] != null) {
- return BytesUtils.bytesToLong(values[index]);
- } else {
- throw new SQLException(String.format(VALUE_IS_NULL, columnName));
- }
- }
+ public abstract long getLong(String columnName) throws SQLException;
@Override
public ResultSetMetaData getMetaData() {
- return new IoTDBResultMetadata(columnInfoList, columnTypeList, ignoreTimeStamp);
+ return new IoTDBResultMetadata(columnNameList, columnTypeList, ignoreTimeStamp);
}
@Override
@@ -683,100 +659,11 @@ public class IoTDBQueryResultSet implements ResultSet {
/**
* @return true means has results
*/
- private boolean fetchResults() throws SQLException {
- rowsIndex = 0;
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId);
- try {
- TSFetchResultsResp resp = client.fetchResults(req);
-
- try {
- RpcUtils.verifySuccess(resp.getStatus());
- } catch (IoTDBRPCException e) {
- throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
- }
- if (!resp.hasResultSet) {
- emptyResultSet = true;
- } else {
- tsQueryDataSet = resp.getQueryDataSet();
- }
- return resp.hasResultSet;
- } catch (TException e) {
- throw new SQLException(
- "Cannot fetch result from server, because of network connection: {} ", e);
- }
- }
+ abstract boolean fetchResults() throws SQLException;
- private boolean hasCachedResults() {
- return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining();
- }
+ abstract boolean hasCachedResults();
- private void constructOneRow() {
- tsQueryDataSet.time.get(time);
- for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
- ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
- // another new 8 row, should move the bitmap buffer position to next byte
- if (rowsIndex % 8 == 0) {
- currentBitmap[i] = bitmapBuffer.get();
- }
- values[i] = null;
- if (!isNull(i, rowsIndex)) {
- ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
- TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
- switch (dataType) {
- case BOOLEAN:
- if (values[i] == null) {
- values[i] = new byte[1];
- }
- valueBuffer.get(values[i]);
- break;
- case INT32:
- if (values[i] == null) {
- values[i] = new byte[Integer.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case INT64:
- if (values[i] == null) {
- values[i] = new byte[Long.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case FLOAT:
- if (values[i] == null) {
- values[i] = new byte[Float.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case DOUBLE:
- if (values[i] == null) {
- values[i] = new byte[Double.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case TEXT:
- int length = valueBuffer.getInt();
- values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
- }
- }
- }
- rowsIndex++;
- }
-
- /**
- * judge whether the specified column value is null in the current position
- *
- * @param index series index
- * @param rowNum current position
- */
- private boolean isNull(int index, int rowNum) {
- byte bitmap = currentBitmap[index];
- int shift = rowNum % 8;
- return ((FLAG >>> shift) & bitmap) == 0;
- }
+ abstract void constructOneRow();
@Override
public boolean previous() throws SQLException {
@@ -1229,34 +1116,23 @@ public class IoTDBQueryResultSet implements ResultSet {
throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
}
- private void checkRecord() throws SQLException {
- if (Objects.isNull(tsQueryDataSet)) {
- throw new SQLException("No record remains");
- }
- }
+ abstract void checkRecord() throws SQLException;
private String findColumnNameByIndex(int columnIndex) throws SQLException {
if (columnIndex <= 0) {
throw new SQLException("column index should start from 1");
}
- if (columnIndex > columnInfoList.size()) {
+ if (columnIndex > columnNameList.size()) {
throw new SQLException(
- String.format("column index %d out of range %d", columnIndex, columnInfoList.size()));
+ String.format("column index %d out of range %d", columnIndex, columnNameList.size()));
}
- return columnInfoList.get(columnIndex - 1);
+ return columnNameList.get(columnIndex - 1);
}
- private String getValueByName(String columnName) throws SQLException {
- checkRecord();
- if (columnName.equals(TIMESTAMP_STR)) {
- return String.valueOf(BytesUtils.bytesToLong(time));
- }
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (index < 0 || index >= values.length || values[index] == null) {
- return null;
- }
- TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(index));
- switch (dataType) {
+ abstract String getValueByName(String columnName) throws SQLException;
+
+ protected String getString(int index, TSDataType tsDataType, byte[][] values) {
+ switch (tsDataType) {
case BOOLEAN:
return String.valueOf(BytesUtils.bytesToBool(values[index]));
case INT32:
@@ -1273,12 +1149,4 @@ public class IoTDBQueryResultSet implements ResultSet {
return null;
}
}
-
- public boolean isIgnoreTimeStamp() {
- return ignoreTimeStamp;
- }
-
- public void setIgnoreTimeStamp(boolean ignoreTimeStamp) {
- this.ignoreTimeStamp = ignoreTimeStamp;
- }
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
new file mode 100644
index 0000000..e71ccb9
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+
+public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet {
+
+ private static final int TIMESTAMP_STR_LENGTH = 4;
+ private static final String EMPTY_STR = "";
+
+ private TSQueryNonAlignDataSet tsQueryNonAlignDataSet = null;
+ private byte[][] times; // used for disable align
+
+ // for disable align clause
+ public IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList,
+ List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+ String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset)
+ throws SQLException {
+ super(statement, columnNameList, columnTypeList, ignoreTimeStamp, client, sql, queryId, sessionId);
+
+ times = new byte[columnNameList.size()][Long.BYTES];
+
+ super.columnNameList = new ArrayList<>();
+ // deduplicate and map
+ super.columnOrdinalMap = new HashMap<>();
+ super.columnOrdinalMap.put(TIMESTAMP_STR, 1);
+ super.columnTypeDeduplicatedList = new ArrayList<>();
+ int index = START_INDEX;
+ for (int i = 0; i < columnNameList.size(); i++) {
+ String name = columnNameList.get(i);
+ super.columnNameList.add(TIMESTAMP_STR + name);
+ super.columnNameList.add(name);
+ if (!columnOrdinalMap.containsKey(name)) {
+ columnOrdinalMap.put(name, index++);
+ columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
+ }
+ }
+ this.tsQueryNonAlignDataSet = dataset;
+ }
+
+ @Override
+ public long getLong(String columnName) throws SQLException {
+ checkRecord();
+ if (columnName.startsWith(TIMESTAMP_STR)) {
+ String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+ int index = columnOrdinalMap.get(column) - START_INDEX;
+ if (times[index] != null)
+ return BytesUtils.bytesToLong(times[index]);
+ else
+ throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ }
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToLong(values[index]);
+ } else {
+ throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ }
+ }
+
+ @Override
+ protected boolean fetchResults() throws SQLException {
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, false);
+ try {
+ TSFetchResultsResp resp = client.fetchResults(req);
+
+ try {
+ RpcUtils.verifySuccess(resp.getStatus());
+ } catch (IoTDBRPCException e) {
+ throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
+ }
+ if (!resp.hasResultSet) {
+ emptyResultSet = true;
+ } else {
+ tsQueryNonAlignDataSet = resp.getNonAlignQueryDataSet();
+ if (tsQueryNonAlignDataSet == null) {
+ return false;
+ }
+ }
+ return resp.hasResultSet;
+ } catch (TException e) {
+ throw new SQLException(
+ "Cannot fetch result from server, because of network connection: {} ", e);
+ }
+ }
+
+ @Override
+ protected boolean hasCachedResults() {
+ return (tsQueryNonAlignDataSet != null && hasTimesRemaining());
+ }
+
+ // check if has times remaining for disable align clause
+ private boolean hasTimesRemaining() {
+ for (ByteBuffer time : tsQueryNonAlignDataSet.timeList) {
+ if (time.hasRemaining()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected void constructOneRow() {
+ for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) {
+ times[i] = null;
+ values[i] = null;
+ if (tsQueryNonAlignDataSet.timeList.get(i).remaining() >= Long.BYTES) {
+
+ times[i] = new byte[Long.BYTES];
+
+ tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+ ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+ TSDataType dataType = columnTypeDeduplicatedList.get(i);
+ switch (dataType) {
+ case BOOLEAN:
+ values[i] = new byte[1];
+ valueBuffer.get(values[i]);
+ break;
+ case INT32:
+ values[i] = new byte[Integer.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case INT64:
+ values[i] = new byte[Long.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case FLOAT:
+ values[i] = new byte[Float.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case DOUBLE:
+ values[i] = new byte[Double.BYTES];
+ valueBuffer.get(values[i]);
+ break;
+ case TEXT:
+ int length = valueBuffer.getInt();
+ values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+ }
+ }
+ else {
+ values[i] = EMPTY_STR.getBytes();
+ }
+ }
+ }
+
+ @Override
+ protected void checkRecord() throws SQLException {
+ if (Objects.isNull(tsQueryNonAlignDataSet)) {
+ throw new SQLException("No record remains");
+ }
+ }
+
+ @Override
+ protected String getValueByName(String columnName) throws SQLException {
+ checkRecord();
+ if (columnName.startsWith(TIMESTAMP_STR)) {
+ String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+ int index = columnOrdinalMap.get(column) - START_INDEX;
+ if (times[index] == null || times[index].length == 0) {
+ return null;
+ }
+ return String.valueOf(BytesUtils.bytesToLong(times[index]));
+ }
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (index < 0 || index >= values.length || values[index] == null || values[index].length < 1) {
+ return null;
+ }
+ return getString(index, columnTypeDeduplicatedList.get(index), values);
+ }
+}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
index 3a478c7..d0f466e 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
@@ -21,1264 +21,183 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.net.URL;
import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.*;
-import java.util.*;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
-public class IoTDBQueryResultSet implements ResultSet {
+public class IoTDBQueryResultSet extends AbstractIoTDBResultSet {
- private static final String TIMESTAMP_STR = "Time";
private static final int START_INDEX = 2;
private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
- private Statement statement = null;
- private String sql;
- private SQLWarning warningChain = null;
- private boolean isClosed = false;
- private TSIService.Iface client = null;
- private List<String> columnInfoList; // no deduplication
- private List<String> columnTypeList; // no deduplication
- private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
- private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
- private int fetchSize;
- private boolean emptyResultSet = false;
+ private boolean align = true;
private TSQueryDataSet tsQueryDataSet = null;
private byte[] time; // used to cache the current time value
- private byte[][] values; // used to cache the current row record value
private byte[] currentBitmap; // used to cache the current bitmap for every column
private static final int FLAG = 0x80; // used to do `and` operation with bitmap to judge whether the value is null
- private long sessionId;
- private long queryId;
- private boolean ignoreTimeStamp = false;
-
- public IoTDBQueryResultSet() {
- // do nothing
- }
public IoTDBQueryResultSet(Statement statement, List<String> columnNameList,
List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
String sql, long queryId, long sessionId, TSQueryDataSet dataset)
throws SQLException {
- this.statement = statement;
- this.fetchSize = statement.getFetchSize();
- this.columnTypeList = columnTypeList;
-
+ super(statement, columnNameList, columnTypeList, ignoreTimeStamp, client, sql, queryId, sessionId);
time = new byte[Long.BYTES];
- values = new byte[columnNameList.size()][];
currentBitmap = new byte[columnNameList.size()];
-
- this.columnInfoList = new ArrayList<>();
- if(!ignoreTimeStamp) {
- this.columnInfoList.add(TIMESTAMP_STR);
- }
- // deduplicate and map
- this.columnInfoMap = new HashMap<>();
- if(!ignoreTimeStamp) {
- this.columnInfoMap.put(TIMESTAMP_STR, 1);
- }
- this.columnTypeDeduplicatedList = new ArrayList<>();
- int index = START_INDEX;
- for (int i = 0; i < columnNameList.size(); i++) {
- String name = columnNameList.get(i);
- columnInfoList.add(name);
- if (!columnInfoMap.containsKey(name)) {
- columnInfoMap.put(name, index++);
- columnTypeDeduplicatedList.add(columnTypeList.get(i));
- }
- }
-
- this.ignoreTimeStamp = ignoreTimeStamp;
- this.client = client;
- this.sql = sql;
- this.queryId = queryId;
this.tsQueryDataSet = dataset;
- this.sessionId = sessionId;
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean absolute(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void afterLast() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void beforeFirst() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void cancelRowUpdates() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void close() throws SQLException {
- if (isClosed) {
- return;
- }
- if (client != null) {
- try {
- TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
- closeReq.setQueryId(queryId);
- TSStatus closeResp = client.closeOperation(closeReq);
- RpcUtils.verifySuccess(closeResp);
- } catch (IoTDBRPCException e) {
- throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
- } catch (TException e) {
- throw new SQLException(
- "Error occurs when connecting to server for close operation, becasue: " + e);
- }
- }
- client = null;
- isClosed = true;
- }
-
-
- @Override
- public void deleteRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int findColumn(String columnName) {
- return columnInfoMap.get(columnName);
- }
-
- @Override
- public boolean first() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Array getArray(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Array getArray(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public InputStream getAsciiStream(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public InputStream getAsciiStream(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
- return getBigDecimal(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public BigDecimal getBigDecimal(String columnName) throws SQLException {
- return new BigDecimal(Objects.requireNonNull(getValueByName(columnName)));
- }
-
- @Override
- public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
- MathContext mc = new MathContext(scale);
- return getBigDecimal(columnIndex).round(mc);
- }
-
- @Override
- public BigDecimal getBigDecimal(String columnName, int scale) throws SQLException {
- return getBigDecimal(findColumn(columnName), scale);
- }
-
- @Override
- public InputStream getBinaryStream(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public InputStream getBinaryStream(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Blob getBlob(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Blob getBlob(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
}
@Override
- public boolean getBoolean(int columnIndex) throws SQLException {
- return getBoolean(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public boolean getBoolean(String columnName) throws SQLException {
+ public long getLong(String columnName) throws SQLException {
checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (values[index] != null) {
- return BytesUtils.bytesToBool(values[index]);
- }
- else {
- throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ if (columnName.equals(TIMESTAMP_STR)) {
+ return BytesUtils.bytesToLong(time);
}
- }
-
- @Override
- public byte getByte(int columnIndex) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public byte getByte(String columnName) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public byte[] getBytes(int columnIndex) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public byte[] getBytes(String columnName) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Reader getCharacterStream(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Reader getCharacterStream(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Clob getClob(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Clob getClob(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int getConcurrency() {
- return ResultSet.CONCUR_READ_ONLY;
- }
-
- @Override
- public String getCursorName() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Date getDate(int columnIndex) throws SQLException {
- return new Date(getLong(columnIndex));
- }
-
- @Override
- public Date getDate(String columnName) throws SQLException {
- return getDate(findColumn(columnName));
- }
-
- @Override
- public Date getDate(int arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Date getDate(String arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public double getDouble(int columnIndex) throws SQLException {
- return getDouble(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public double getDouble(String columnName) throws SQLException {
- checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (values[index] != null) {
- return BytesUtils.bytesToDouble(values[index]);
+ return BytesUtils.bytesToLong(values[index]);
} else {
throw new SQLException(String.format(VALUE_IS_NULL, columnName));
}
}
@Override
- public int getFetchDirection() throws SQLException {
- return ResultSet.FETCH_FORWARD;
- }
-
- @Override
- public void setFetchDirection(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int getFetchSize() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
+ protected boolean fetchResults() throws SQLException {
+ rowsIndex = 0;
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, align);
+ try {
+ TSFetchResultsResp resp = client.fetchResults(req);
- @Override
- public void setFetchSize(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ try {
+ RpcUtils.verifySuccess(resp.getStatus());
+ } catch (IoTDBRPCException e) {
+ throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
+ }
+ if (!resp.hasResultSet) {
+ emptyResultSet = true;
+ } else {
+ tsQueryDataSet = resp.getQueryDataSet();
+ }
+ return resp.hasResultSet;
+ } catch (TException e) {
+ throw new SQLException(
+ "Cannot fetch result from server, because of network connection: {} ", e);
+ }
}
@Override
- public float getFloat(int columnIndex) throws SQLException {
- return getFloat(findColumnNameByIndex(columnIndex));
+ protected boolean hasCachedResults() {
+ return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
}
@Override
- public float getFloat(String columnName) throws SQLException {
- checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (values[index] != null) {
- return BytesUtils.bytesToFloat(values[index]);
- } else {
- throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ protected void constructOneRow() {
+ tsQueryDataSet.time.get(time);
+ for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+ ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+ // another new 8 row, should move the bitmap buffer position to next byte
+ if (rowsIndex % 8 == 0) {
+ currentBitmap[i] = bitmapBuffer.get();
+ }
+ values[i] = null;
+ if (!isNull(i, rowsIndex)) {
+ ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+ TSDataType dataType = columnTypeDeduplicatedList.get(i);
+ switch (dataType) {
+ case BOOLEAN:
+ if (values[i] == null) {
+ values[i] = new byte[1];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case INT32:
+ if (values[i] == null) {
+ values[i] = new byte[Integer.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case INT64:
+ if (values[i] == null) {
+ values[i] = new byte[Long.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case FLOAT:
+ if (values[i] == null) {
+ values[i] = new byte[Float.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case DOUBLE:
+ if (values[i] == null) {
+ values[i] = new byte[Double.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case TEXT:
+ int length = valueBuffer.getInt();
+ values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+ }
+ }
}
+ rowsIndex++;
}
- @Override
- public int getHoldability() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int getInt(int columnIndex) throws SQLException {
- return getInt(findColumnNameByIndex(columnIndex));
+ /**
+ * judge whether the specified column value is null in the current position
+ *
+ * @param index series index
+ * @param rowNum current position
+ */
+ private boolean isNull(int index, int rowNum) {
+ byte bitmap = currentBitmap[index];
+ int shift = rowNum % 8;
+ return ((FLAG >>> shift) & bitmap) == 0;
}
@Override
- public int getInt(String columnName) throws SQLException {
- checkRecord();
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (values[index] != null) {
- return BytesUtils.bytesToInt(values[index]);
- } else {
- throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ protected void checkRecord() throws SQLException {
+ if (Objects.isNull(tsQueryDataSet)) {
+ throw new SQLException("No record remains");
}
}
@Override
- public long getLong(int columnIndex) throws SQLException {
- return getLong(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public long getLong(String columnName) throws SQLException {
+ protected String getValueByName(String columnName) throws SQLException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return BytesUtils.bytesToLong(time);
- }
- int index = columnInfoMap.get(columnName) - START_INDEX;
- if (values[index] != null) {
- return BytesUtils.bytesToLong(values[index]);
- } else {
- throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+ return String.valueOf(BytesUtils.bytesToLong(time));
}
- }
-
- @Override
- public ResultSetMetaData getMetaData() {
- return new IoTDBResultMetadata(columnInfoList, columnTypeList, ignoreTimeStamp);
- }
-
- @Override
- public Reader getNCharacterStream(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Reader getNCharacterStream(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public NClob getNClob(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public NClob getNClob(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public String getNString(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public String getNString(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Object getObject(int columnIndex) throws SQLException {
- return getObject(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public Object getObject(String columnName) throws SQLException {
- return getValueByName(columnName);
- }
-
- @Override
- public Object getObject(int arg0, Map<String, Class<?>> arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Object getObject(String arg0, Map<String, Class<?>> arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public <T> T getObject(int arg0, Class<T> arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public <T> T getObject(String arg0, Class<T> arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Ref getRef(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Ref getRef(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int getRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public RowId getRowId(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public RowId getRowId(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public SQLXML getSQLXML(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public SQLXML getSQLXML(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public short getShort(int columnIndex) throws SQLException {
- return getShort(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public short getShort(String columnName) throws SQLException {
- return Short.parseShort(Objects.requireNonNull(getValueByName(columnName)));
- }
-
- @Override
- public Statement getStatement() {
- return this.statement;
- }
-
- @Override
- public String getString(int columnIndex) throws SQLException {
- return getString(findColumnNameByIndex(columnIndex));
- }
-
- @Override
- public String getString(String columnName) throws SQLException {
- return getValueByName(columnName);
- }
-
- @Override
- public Time getTime(int columnIndex) throws SQLException {
- return new Time(getLong(columnIndex));
- }
-
- @Override
- public Time getTime(String columnName) throws SQLException {
- return getTime(findColumn(columnName));
- }
-
- @Override
- public Time getTime(int arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Time getTime(String arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Timestamp getTimestamp(int columnIndex) throws SQLException {
- return new Timestamp(getLong(columnIndex));
- }
-
- @Override
- public Timestamp getTimestamp(String columnName) throws SQLException {
- return getTimestamp(findColumn(columnName));
- }
-
- @Override
- public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public int getType() {
- return ResultSet.TYPE_FORWARD_ONLY;
- }
-
- @Override
- public URL getURL(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public URL getURL(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public InputStream getUnicodeStream(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public InputStream getUnicodeStream(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public SQLWarning getWarnings() {
- return warningChain;
- }
-
- @Override
- public void insertRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean isAfterLast() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean isBeforeFirst() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean isClosed() {
- return isClosed;
- }
-
- @Override
- public boolean isFirst() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean isLast() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean last() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void moveToCurrentRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void moveToInsertRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean next() throws SQLException {
- if (hasCachedResults()) {
- constructOneRow();
- return true;
- }
- if (emptyResultSet) {
- return false;
- }
- if (fetchResults()) {
- constructOneRow();
- return true;
- }
- return false;
- }
-
-
- /**
- * @return true means has results
- */
- private boolean fetchResults() throws SQLException {
- rowsIndex = 0;
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId);
- try {
- TSFetchResultsResp resp = client.fetchResults(req);
-
- try {
- RpcUtils.verifySuccess(resp.getStatus());
- } catch (IoTDBRPCException e) {
- throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
- }
- if (!resp.hasResultSet) {
- emptyResultSet = true;
- } else {
- tsQueryDataSet = resp.getQueryDataSet();
- }
- return resp.hasResultSet;
- } catch (TException e) {
- throw new SQLException(
- "Cannot fetch result from server, because of network connection: {} ", e);
- }
- }
-
- private boolean hasCachedResults() {
- return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining();
- }
-
- private void constructOneRow() {
- tsQueryDataSet.time.get(time);
- for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
- ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
- // another new 8 row, should move the bitmap buffer position to next byte
- if (rowsIndex % 8 == 0) {
- currentBitmap[i] = bitmapBuffer.get();
- }
- values[i] = null;
- if (!isNull(i, rowsIndex)) {
- ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
- TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
- switch (dataType) {
- case BOOLEAN:
- if (values[i] == null) {
- values[i] = new byte[1];
- }
- valueBuffer.get(values[i]);
- break;
- case INT32:
- if (values[i] == null) {
- values[i] = new byte[Integer.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case INT64:
- if (values[i] == null) {
- values[i] = new byte[Long.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case FLOAT:
- if (values[i] == null) {
- values[i] = new byte[Float.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case DOUBLE:
- if (values[i] == null) {
- values[i] = new byte[Double.BYTES];
- }
- valueBuffer.get(values[i]);
- break;
- case TEXT:
- int length = valueBuffer.getInt();
- values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
- }
- }
- }
- rowsIndex++;
- }
-
- /**
- * judge whether the specified column value is null in the current position
- *
- * @param index series index
- * @param rowNum current position
- */
- private boolean isNull(int index, int rowNum) {
- byte bitmap = currentBitmap[index];
- int shift = rowNum % 8;
- return ((FLAG >>> shift) & bitmap) == 0;
- }
-
- @Override
- public boolean previous() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void refreshRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean relative(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean rowDeleted() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean rowInserted() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean rowUpdated() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateArray(int arg0, Array arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateArray(String arg0, Array arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(int arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(String arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(int arg0, InputStream arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(String arg0, InputStream arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(int arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateAsciiStream(String arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBigDecimal(String arg0, BigDecimal arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(int arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(String arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(int arg0, InputStream arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(String arg0, InputStream arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(int arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBinaryStream(String arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(int arg0, Blob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(String arg0, Blob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(int arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(String arg0, InputStream arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(int arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBlob(String arg0, InputStream arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBoolean(int arg0, boolean arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBoolean(String arg0, boolean arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateByte(int arg0, byte arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateByte(String arg0, byte arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBytes(int arg0, byte[] arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateBytes(String arg0, byte[] arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(int arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(String arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(int arg0, Reader arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(String arg0, Reader arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateClob(int arg0, Clob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateClob(String arg0, Clob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateClob(int arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateClob(String arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateClob(int arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
-
- }
-
- @Override
- public void updateClob(String arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateDate(int arg0, Date arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateDate(String arg0, Date arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateDouble(int arg0, double arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateDouble(String arg0, double arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateFloat(int arg0, float arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateFloat(String arg0, float arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateInt(int arg0, int arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateInt(String arg0, int arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateLong(int arg0, long arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateLong(String arg0, long arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNCharacterStream(int arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNCharacterStream(String arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(int arg0, NClob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(String arg0, NClob arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(int arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(String arg0, Reader arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(int arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNClob(String arg0, Reader arg1, long arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNString(int arg0, String arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNString(String arg0, String arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNull(int arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateNull(String arg0) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateObject(int arg0, Object arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateObject(String arg0, Object arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateObject(int arg0, Object arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateObject(String arg0, Object arg1, int arg2) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateRef(int arg0, Ref arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateRef(String arg0, Ref arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateRow() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateRowId(int arg0, RowId arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateRowId(String arg0, RowId arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateSQLXML(int arg0, SQLXML arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateSQLXML(String arg0, SQLXML arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateShort(int arg0, short arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateShort(String arg0, short arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateString(int arg0, String arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateString(String arg0, String arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateTime(int arg0, Time arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateTime(String arg0, Time arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateTimestamp(int arg0, Timestamp arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public void updateTimestamp(String arg0, Timestamp arg1) throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- @Override
- public boolean wasNull() throws SQLException {
- throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
- }
-
- private void checkRecord() throws SQLException {
- if (Objects.isNull(tsQueryDataSet)) {
- throw new SQLException("No record remains");
- }
- }
-
- private String findColumnNameByIndex(int columnIndex) throws SQLException {
- if (columnIndex <= 0) {
- throw new SQLException("column index should start from 1");
- }
- if (columnIndex > columnInfoList.size()) {
- throw new SQLException(
- String.format("column index %d out of range %d", columnIndex, columnInfoList.size()));
- }
- return columnInfoList.get(columnIndex - 1);
- }
-
- private String getValueByName(String columnName) throws SQLException {
- checkRecord();
- if (columnName.equals(TIMESTAMP_STR)) {
- return String.valueOf(BytesUtils.bytesToLong(time));
- }
- int index = columnInfoMap.get(columnName) - START_INDEX;
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
if (index < 0 || index >= values.length || values[index] == null) {
return null;
}
- TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(index));
- switch (dataType) {
- case BOOLEAN:
- return String.valueOf(BytesUtils.bytesToBool(values[index]));
- case INT32:
- return String.valueOf(BytesUtils.bytesToInt(values[index]));
- case INT64:
- return String.valueOf(BytesUtils.bytesToLong(values[index]));
- case FLOAT:
- return String.valueOf(BytesUtils.bytesToFloat(values[index]));
- case DOUBLE:
- return String.valueOf(BytesUtils.bytesToDouble(values[index]));
- case TEXT:
- return new String(values[index]);
- default:
- return null;
- }
+ return getString(index, columnTypeDeduplicatedList.get(index), values);
}
public boolean isIgnoreTimeStamp() {
return ignoreTimeStamp;
}
- public void setIgnoreTimeStamp(boolean ignoreTimeStamp) {
- this.ignoreTimeStamp = ignoreTimeStamp;
+
+ public boolean isAlign() {
+ return align;
}
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 7fedfdb..542668c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -224,9 +224,16 @@ public class IoTDBStatement implements Statement {
}
if (execResp.isSetColumns()) {
queryId = execResp.getQueryId();
- this.resultSet = new IoTDBQueryResultSet(this,
- execResp.getColumns(), execResp.getDataTypeList(),
- execResp.ignoreTimeStamp, client, sql, queryId, sessionId, execResp.queryDataSet);
+ if (execResp.queryDataSet == null) {
+ this.resultSet = new IoTDBNonAlignQueryResultSet(this, execResp.getColumns(),
+ execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, sql, queryId,
+ sessionId, execResp.nonAlignQueryDataSet);
+ }
+ else {
+ this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
+ execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, sql, queryId,
+ sessionId, execResp.queryDataSet);
+ }
return true;
}
return false;
@@ -323,9 +330,16 @@ public class IoTDBStatement implements Statement {
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
- this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
- execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, sql, queryId,
- sessionId, execResp.queryDataSet);
+ if (execResp.queryDataSet == null) {
+ this.resultSet = new IoTDBNonAlignQueryResultSet(this, execResp.getColumns(),
+ execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, sql, queryId,
+ sessionId, execResp.nonAlignQueryDataSet);
+ }
+ else {
+ this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
+ execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, sql, queryId,
+ sessionId, execResp.queryDataSet);
+ }
return resultSet;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 479849a..e4e8e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -18,25 +18,21 @@
*/
package org.apache.iotdb.db.conf;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Properties;
-
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.*;
+import java.util.Properties;
+
public class IoTDBConfigCheck {
// this file is located in data/system/schema/system_properties.
// If user delete folder "data", system_properties can reset.
public static final String PROPERTIES_FILE_NAME = "system.properties";
public static final String SCHEMA_DIR =
- IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
+ IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
private static final IoTDBConfigCheck INSTANCE = new IoTDBConfigCheck();
private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
// this is a initial parameter.
@@ -53,14 +49,14 @@ public class IoTDBConfigCheck {
// check time stamp precision
if (!(TIMESTAMP_PRECISION.equals("ms") || TIMESTAMP_PRECISION.equals("us")
- || TIMESTAMP_PRECISION.equals("ns"))) {
+ || TIMESTAMP_PRECISION.equals("ns"))) {
logger.error("Wrong timestamp precision, please set as: ms, us or ns ! Current is: "
- + TIMESTAMP_PRECISION);
+ + TIMESTAMP_PRECISION);
System.exit(-1);
}
PARTITION_INTERVAL = IoTDBDescriptor.getInstance().getConfig()
- .getPartitionInterval();
+ .getPartitionInterval();
// check partition interval
if (PARTITION_INTERVAL <= 0) {
@@ -85,7 +81,7 @@ public class IoTDBConfigCheck {
// create file : read timestamp precision from engine.properties, create system_properties.txt
// use output stream to write timestamp precision to file.
File file = SystemFileFactory.INSTANCE
- .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
try {
if (!file.exists()) {
file.createNewFile();
@@ -101,18 +97,18 @@ public class IoTDBConfigCheck {
}
// get existed properties from system_properties.txt
File inputFile = SystemFileFactory.INSTANCE
- .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
try (FileInputStream inputStream = new FileInputStream(inputFile.toString())) {
properties.load(new InputStreamReader(inputStream, TSFileConfig.STRING_CHARSET));
if (!properties.getProperty("timestamp_precision").equals(TIMESTAMP_PRECISION)) {
logger.error("Wrong timestamp precision, please set as: " + properties
- .getProperty("timestamp_precision") + " !");
+ .getProperty("timestamp_precision") + " !");
System.exit(-1);
}
if (!(Long.parseLong(properties.getProperty("storage_group_time_range"))
- == PARTITION_INTERVAL)) {
+ == PARTITION_INTERVAL)) {
logger.error("Wrong storage group time range, please set as: " + properties
- .getProperty("storage_group_time_range") + " !");
+ .getProperty("storage_group_time_range") + " !");
System.exit(-1);
}
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index f88ea37..11f452a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -45,6 +45,7 @@ public class QueryPlan extends PhysicalPlan {
private int rowLimit = 0;
private int rowOffset = 0;
+ private boolean isAlign = true; // for disable align sql
private boolean isGroupByDevice = false; // for group by device sql
private List<String> measurements; // for group by device sql, e.g. temperature
private Map<String, Set<String>> measurementsGroupByDevice; // for group by device sql, e.g. root.ln.d1 -> temperature
@@ -140,6 +141,14 @@ public class QueryPlan extends PhysicalPlan {
public void setGroupByDevice(boolean groupByDevice) {
isGroupByDevice = groupByDevice;
}
+
+ public boolean isAlign() {
+ return isAlign;
+ }
+
+ public void setAlign(boolean align) {
+ isAlign = align;
+ }
public void setMeasurements(List<String> measurements) {
this.measurements = measurements;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 47b09af..8b4d8f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -204,7 +204,6 @@ public class PhysicalGenerator {
} else {
queryPlan = new QueryPlan();
}
-
if (queryOperator.isGroupByDevice()) {
// below is the core realization of GROUP_BY_DEVICE sql logic
List<Path> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
@@ -323,6 +322,7 @@ public class PhysicalGenerator {
queryPlan.setDeviceToFilterMap(concatFilterByDivice(prefixPaths, filterOperator));
}
} else {
+ queryPlan.setAlign(queryOperator.isAlign());
List<Path> paths = queryOperator.getSelectedPaths();
queryPlan.setPaths(paths);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
index d417b1b..3208378 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
@@ -180,7 +180,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet {
bitmapBAOSList[seriesIndex] = new PublicBAOS();
}
- // used to record a bitmap for every 8 row record
+ // used to record a bitmap for every 8 row records
int[] currentBitmapList = new int[seriesNum];
int rowCount = 0;
while (rowCount < fetchSize) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
new file mode 100644
index 0000000..46a8b3a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+
+ private class ReadTask implements Runnable {
+
+ private final ManagedSeriesReader reader;
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
+ private WatermarkEncoder encoder;
+ private int index;
+
+
+ public ReadTask(ManagedSeriesReader reader,
+ BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
+ WatermarkEncoder encoder, int index) {
+ this.reader = reader;
+ this.blockingQueue = blockingQueue;
+ this.encoder = encoder;
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
+ PublicBAOS timeBAOS = new PublicBAOS();
+ PublicBAOS valueBAOS = new PublicBAOS();
+ try {
+ synchronized (reader) {
+ // if the task is submitted, there must be free space in the queue
+ // so here we don't need to check whether the queue has free space
+ // the reader has next batch
+ if ((cachedBatchData[index] != null && cachedBatchData[index].hasCurrent())
+ || reader.hasNextBatch()) {
+ BatchData batchData;
+ if (cachedBatchData[index] != null && cachedBatchData[index].hasCurrent()) {
+ batchData = cachedBatchData[index];
+ }
+ else {
+ batchData = reader.nextBatch();
+ }
+ int rowCount = 0;
+ while (rowCount < fetchSize) {
+
+ if ((limit > 0 && alreadyReturnedRowNumArray.get(index) >= limit)) {
+ break;
+ }
+
+ if (batchData != null && batchData.hasCurrent()) {
+ if (offsetArray.get(index) == 0) {
+ long time = batchData.currentTime();
+ ReadWriteIOUtils.write(time, timeBAOS);
+ TSDataType type = batchData.getDataType();
+ switch (type) {
+ case INT32:
+ int intValue = batchData.getInt();
+ if (encoder != null && encoder.needEncode(time)) {
+ intValue = encoder.encodeInt(intValue, time);
+ }
+ ReadWriteIOUtils.write(intValue, valueBAOS);
+ break;
+ case INT64:
+ long longValue = batchData.getLong();
+ if (encoder != null && encoder.needEncode(time)) {
+ longValue = encoder.encodeLong(longValue, time);
+ }
+ ReadWriteIOUtils.write(longValue, valueBAOS);
+ break;
+ case FLOAT:
+ float floatValue = batchData.getFloat();
+ if (encoder != null && encoder.needEncode(time)) {
+ floatValue = encoder.encodeFloat(floatValue, time);
+ }
+ ReadWriteIOUtils.write(floatValue, valueBAOS);
+ break;
+ case DOUBLE:
+ double doubleValue = batchData.getDouble();
+ if (encoder != null && encoder.needEncode(time)) {
+ doubleValue = encoder.encodeDouble(doubleValue, time);
+ }
+ ReadWriteIOUtils.write(doubleValue, valueBAOS);
+ break;
+ case BOOLEAN:
+ ReadWriteIOUtils.write(batchData.getBoolean(),
+ valueBAOS);
+ break;
+ case TEXT:
+ ReadWriteIOUtils
+ .write(batchData.getBinary(),
+ valueBAOS);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ }
+ batchData.next();
+ }
+ else {
+ if (reader.hasNextBatch()) {
+ batchData = reader.nextBatch();
+ cachedBatchData[index] = batchData;
+ continue;
+ }
+ else {
+ break;
+ }
+ }
+ if (offsetArray.get(index) == 0) {
+ rowCount++;
+ if (limit > 0) {
+ alreadyReturnedRowNumArray.incrementAndGet(index);
+ }
+ } else {
+ offsetArray.decrementAndGet(index);
+ }
+ }
+ if (rowCount == 0) {
+ blockingQueue.put(new Pair(null, null));
+ // set the hasRemaining field in reader to false
+ // tell the Consumer not to submit another task for this reader any more
+ reader.setHasRemaining(false);
+ // remove itself from the QueryTaskPoolManager
+ reader.setManagedByQueryManager(false);
+ return;
+ }
+
+ ByteBuffer timeBuffer = ByteBuffer.wrap(timeBAOS.getBuf());
+ timeBuffer.limit(timeBAOS.size());
+ ByteBuffer valueBuffer = ByteBuffer.wrap(valueBAOS.getBuf());
+ valueBuffer.limit(valueBAOS.size());
+
+ Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
+
+ blockingQueue.put(timeValueBAOSPair);
+ // if the queue also has free space, just submit another itself
+ if (blockingQueue.remainingCapacity() > 0) {
+ pool.submit(this);
+ }
+ // the queue has no more space
+ // remove itself from the QueryTaskPoolManager
+ else {
+ reader.setManagedByQueryManager(false);
+ }
+ return;
+ }
+ blockingQueue.put(new Pair(null, null));
+ // set the hasRemaining field in reader to false
+ // tell the Consumer not to submit another task for this reader any more
+ reader.setHasRemaining(false);
+ // remove itself from the QueryTaskPoolManager
+ reader.setManagedByQueryManager(false);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+ } catch (Exception e) {
+ LOGGER.error("Something gets wrong: ", e);
+ }
+
+ }
+
+ }
+
+
+ private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+
+ // Blocking queue list for each time value buffer pair
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
+
+ private boolean initialized = false;
+
+ private AtomicIntegerArray offsetArray;
+
+ private int limit;
+
+ private AtomicIntegerArray alreadyReturnedRowNumArray;
+
+ private BatchData[] cachedBatchData;
+
+ // indicate that there is no more batch data in the corresponding queue
+ // in case that the consumer thread is blocked on the queue and won't get runnable any more
+ // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+ // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+ // noMoreDataInQueue can still be true
+ // its usage is to tell the consumer thread not to call the take() method.
+ private boolean[] noMoreDataInQueueArray;
+
+ private int fetchSize;
+
+ // indicate that there is no more batch data in the corresponding queue
+ // in case that the consumer thread is blocked on the queue and won't get runnable any more
+ // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+ // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+ // noMoreDataInQueue can still be true
+ // its usage is to tell the consumer thread not to call the take() method.
+
+ // capacity for blocking queue
+ private static final int BLOCKING_QUEUE_CAPACITY = 5;
+
+ private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
+
+ /**
+ * constructor of EngineDataSet.
+ *
+ * @param paths paths in List structure
+ * @param dataTypes time series data type
+ * @param readers readers in List(IPointReader) structure
+ */
+ public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
+ List<ManagedSeriesReader> readers) {
+ super(paths, dataTypes);
+ this.seriesReaderWithoutValueFilterList = readers;
+ blockingQueueArray = new BlockingQueue[readers.size()];
+ noMoreDataInQueueArray = new boolean[readers.size()];
+ for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+ blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+ }
+ }
+
+ private void initLimit(int offset, int limit, int size) {
+ int[] offsetArrayTemp = new int[size];
+ Arrays.fill(offsetArrayTemp, offset);
+ offsetArray = new AtomicIntegerArray(offsetArrayTemp);
+ this.limit = limit;
+ this.alreadyReturnedRowNumArray = new AtomicIntegerArray(size);
+ cachedBatchData = new BatchData[size];
+ }
+
+ private void init(WatermarkEncoder encoder, int fetchSize) {
+ initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
+ this.fetchSize = fetchSize;
+ for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
+ reader.setHasRemaining(true);
+ reader.setManagedByQueryManager(true);
+ pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, i));
+ }
+ this.initialized = true;
+ }
+
+ /**
+ * for RPC in RawData query between client and server
+ * fill time buffers and value buffers
+ */
+ public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
+ if (!initialized) {
+ init(encoder, fetchSize);
+ }
+ int seriesNum = seriesReaderWithoutValueFilterList.size();
+ TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+ List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
+ List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
+
+ for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ if (!noMoreDataInQueueArray[seriesIndex]) {
+ Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
+ if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
+ noMoreDataInQueueArray[seriesIndex] = true;
+ timeValueByteBufferPair.left = ByteBuffer.allocate(0);
+ timeValueByteBufferPair.right = ByteBuffer.allocate(0);
+ }
+ timeBufferList.add(timeValueByteBufferPair.left);
+ valueBufferList.add(timeValueByteBufferPair.right);
+ }
+ else {
+ timeBufferList.add(ByteBuffer.allocate(0));
+ valueBufferList.add(ByteBuffer.allocate(0));
+ continue;
+ }
+
+ synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
+ if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+ // if the reader isn't being managed and still has more data,
+ // that means this read task leave the pool before because the queue has no more space
+ // now we should submit it again
+ if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
+ reader.setManagedByQueryManager(true);
+ pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
+ encoder, seriesIndex));
+ }
+ }
+ }
+ }
+
+ // set time buffers, value buffers and bitmap buffers
+ tsQueryNonAlignDataSet.setTimeList(timeBufferList);
+ tsQueryNonAlignDataSet.setValueList(valueBufferList);
+
+ return tsQueryNonAlignDataSet;
+ }
+
+
+ @Override
+ protected boolean hasNextWithoutConstraint() {
+ return false;
+ }
+
+ @Override
+ protected RowRecord nextWithoutConstraint() {
+ return null;
+ }
+
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index f6582ff..0a10375 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter;
import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
+import org.apache.iotdb.db.query.dataset.NonAlignEngineDataSet;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
@@ -88,6 +89,28 @@ public class EngineExecutor {
throw new StorageEngineException(e.getMessage());
}
}
+
+ public QueryDataSet executeNonAlign(QueryContext context)
+ throws StorageEngineException, IOException {
+
+ Filter timeFilter = null;
+ if (optimizedExpression != null) {
+ timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
+ }
+
+ List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ Path path = deduplicatedPaths.get(i);
+ TSDataType dataType = deduplicatedDataTypes.get(i);
+
+ ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+ true);
+ readersOfSelectedSeries.add(reader);
+ }
+
+ return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
+ readersOfSelectedSeries);
+ }
/**
* executeWithValueFilter query.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index a5cab17..34a4ef6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
@@ -65,7 +64,12 @@ public class EngineQueryRouter implements IEngineQueryRouter {
EngineExecutor engineExecutor = new EngineExecutor(deduplicatedPaths, deduplicatedDataTypes,
optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- return engineExecutor.executeWithoutValueFilter(context);
+ if (queryPlan.isAlign()) {
+ return engineExecutor.executeWithoutValueFilter(context);
+ }
+ else {
+ return engineExecutor.executeNonAlign(context);
+ }
} else {
return engineExecutor.executeWithValueFilter(context);
}
@@ -76,7 +80,12 @@ public class EngineQueryRouter implements IEngineQueryRouter {
} else {
EngineExecutor engineExecutor = new EngineExecutor(deduplicatedPaths, deduplicatedDataTypes);
try {
- return engineExecutor.executeWithoutValueFilter(context);
+ if (queryPlan.isAlign()) {
+ return engineExecutor.executeWithoutValueFilter(context);
+ }
+ else {
+ return engineExecutor.executeNonAlign(context);
+ }
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
index 3750746..87157bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.rescon;
-import java.util.ArrayDeque;
-import java.util.Deque;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -27,6 +25,9 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
public class MemTablePool {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0e08f16..4517400 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,27 +18,6 @@
*/
package org.apache.iotdb.db.service;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -67,46 +46,16 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
+import org.apache.iotdb.db.query.dataset.NonAlignEngineDataSet;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -119,6 +68,17 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
/**
* Thrift RPC implementation at server side.
*/
@@ -194,7 +154,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return TSDataType.DOUBLE;
default:
throw new QueryProcessException(
- "aggregate does not support " + aggrType + " function.");
+ "aggregate does not support " + aggrType + " function.");
}
}
return MManager.getInstance().getSeriesType(path);
@@ -203,7 +163,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
logger.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME,
- req.getUsername());
+ req.getUsername());
boolean status;
IAuthorizer authorizer;
@@ -230,10 +190,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR);
}
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
resp.setSessionId(sessionId);
logger.info("{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME,
- tsStatus.getStatusType().getMessage(), req.getUsername());
+ tsStatus.getStatusType().getMessage(), req.getUsername());
return resp;
}
@@ -269,9 +229,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
if (!exceptions.isEmpty()) {
return new TSStatus(
- getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
- String.format("%d errors in closeOperation, see server logs for detail",
- exceptions.size())));
+ getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
+ String.format("%d errors in closeOperation, see server logs for detail",
+ exceptions.size())));
}
return new TSStatus(tsStatus);
@@ -373,7 +333,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (QueryProcessException | MetadataException | OutOfMemoryError e) {
logger
- .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
+ .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
status = getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
resp.setStatus(status);
return resp;
@@ -415,7 +375,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
switch (statement) {
case "merge":
StorageEngine.getInstance()
- .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
return true;
case "full merge":
StorageEngine.getInstance().mergeAll(true);
@@ -461,16 +421,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (String statement : statements) {
long t2 = System.currentTimeMillis();
isAllSuccessful =
- executeStatementInBatch(statement, batchErrorMessage, result,
- req.getSessionId()) && isAllSuccessful;
+ executeStatementInBatch(statement, batchErrorMessage, result,
+ req.getSessionId()) && isAllSuccessful;
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
}
if (isAllSuccessful) {
return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS,
- "Execute batch statements successfully"), result);
+ "Execute batch statements successfully"), result);
} else {
return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
- batchErrorMessage.toString()), result);
+ batchErrorMessage.toString()), result);
}
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
@@ -480,16 +440,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// execute one statement of a batch. Currently, query is not allowed in a batch statement and
// on finding queries in a batch, such query will be ignored and an error will be generated
private boolean executeStatementInBatch(String statement, StringBuilder batchErrorMessage,
- List<Integer> result, long sessionId) {
+ List<Integer> result, long sessionId) {
try {
PhysicalPlan physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
+ .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
if (physicalPlan.isQuery()) {
throw new QueryInBatchStatementException(statement);
}
TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, sessionId);
if (resp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS
- .getStatusCode()) {
+ .getStatusCode()) {
result.add(Statement.SUCCESS_NO_INFO);
} else {
result.add(Statement.EXECUTE_FAILED);
@@ -508,8 +468,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return false;
} catch (QueryProcessException e) {
logger.info(
- "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}",
- statement, e.getMessage());
+ "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}",
+ statement, e.getMessage());
result.add(Statement.EXECUTE_FAILED);
batchErrorMessage.append(TSStatusCode.SQL_PARSE_ERROR.getStatusCode()).append("\n");
return false;
@@ -537,13 +497,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (execAdminCommand(statement, req.getSessionId())) {
return getTSExecuteStatementResp(
- getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"));
+ getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"));
}
PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement,
- sessionIdZoneIdMap.get(req.getSessionId()));
+ sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
resp = executeQueryStatement(req.statementId, physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()));
+ sessionIdUsernameMap.get(req.getSessionId()));
long endTime = System.currentTimeMillis();
sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
sqlArgumentsList.add(sqlArgument);
@@ -560,15 +520,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (SQLParserException e) {
logger.error("check metadata error: ", e);
return getTSExecuteStatementResp(getStatus(TSStatusCode.METADATA_ERROR,
- "Check metadata error: " + e.getMessage()));
+ "Check metadata error: " + e.getMessage()));
} catch (QueryProcessException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
- "Statement format is not right: " + e.getMessage()));
+ "Statement format is not right: " + e.getMessage()));
} catch (StorageEngineException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return getTSExecuteStatementResp(getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR,
- e.getMessage()));
+ e.getMessage()));
}
}
@@ -577,7 +537,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* AuthorPlan
*/
private TSExecuteStatementResp executeQueryStatement(long statementId, PhysicalPlan plan,
- int fetchSize, String username) {
+ int fetchSize, String username) {
long t1 = System.currentTimeMillis();
try {
TSExecuteStatementResp resp; // column headers
@@ -588,6 +548,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} else {
resp = getQueryColumnHeaders(plan, username);
}
+ if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlign()) {
+ if (plan.getOperatorType() == OperatorType.AGGREGATION) {
+ throw new QueryProcessException("Aggregation doesn't support disable align clause.");
+ }
+ if (plan.getOperatorType() == OperatorType.FILL) {
+ throw new QueryProcessException("Fill doesn't support disable align clause.");
+ }
+ if (plan.getOperatorType() == OperatorType.GROUPBY) {
+ throw new QueryProcessException("Group by doesn't support disable align clause.");
+ }
+ }
if (plan.getOperatorType() == OperatorType.AGGREGATION) {
resp.setIgnoreTimeStamp(true);
} // else default ignoreTimeStamp is false
@@ -600,14 +571,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
- TSQueryDataSet result = fillRpcReturnData(fetchSize, newDataSet, username);
- resp.setQueryDataSet(result);
- resp.setQueryId(queryId);
+ if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlign()) {
+ TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize, newDataSet, username);
+ resp.setNonAlignQueryDataSet(result);
+ resp.setQueryId(queryId);
+ }
+ else {
+ TSQueryDataSet result = fillRpcReturnData(fetchSize, newDataSet, username);
+ resp.setQueryDataSet(result);
+ resp.setQueryId(queryId);
+ }
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSExecuteStatementResp(
- getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+ getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
}
@@ -624,7 +602,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan;
try {
physicalPlan = processor
- .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
+ .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
} catch (QueryProcessException | SQLParserException e) {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()));
@@ -632,14 +610,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (!physicalPlan.isQuery()) {
return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Statement is not a query statement."));
+ "Statement is not a query statement."));
}
return executeQueryStatement(req.statementId, physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()));
+ sessionIdUsernameMap.get(req.getSessionId()));
}
private TSExecuteStatementResp getShowQueryColumnHeaders(ShowPlan showPlan)
- throws QueryProcessException {
+ throws QueryProcessException {
switch (showPlan.getShowContentType()) {
case TTL:
return StaticResps.TTL_RESP;
@@ -666,7 +644,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
default:
logger.error("Unsupported show content type: {}", showPlan.getShowContentType());
throw new QueryProcessException(
- "Unsupported show content type:" + showPlan.getShowContentType());
+ "Unsupported show content type:" + showPlan.getShowContentType());
}
}
@@ -685,7 +663,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return StaticResps.LIST_USER_PRIVILEGE_RESP;
default:
return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
- String.format("%s is not an auth query", authorPlan.getAuthorType())));
+ String.format("%s is not an auth query", authorPlan.getAuthorType())));
}
}
@@ -694,7 +672,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* get ResultSet schema
*/
private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username)
- throws AuthException, TException, QueryProcessException {
+ throws AuthException, TException, QueryProcessException {
List<String> respColumns = new ArrayList<>();
List<String> columnsTypes = new ArrayList<>();
@@ -702,7 +680,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// check permissions
if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) {
return getTSExecuteStatementResp(getStatus(TSStatusCode.NO_PERMISSION_ERROR,
- "No permissions for this operation " + physicalPlan.getOperatorType()));
+ "No permissions for this operation " + physicalPlan.getOperatorType()));
}
TSExecuteStatementResp resp = getTSExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -714,7 +692,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// set dataTypeList in TSExecuteStatementResp. Note this is without deduplication.
resp.setColumns(respColumns);
resp.setDataTypeList(columnsTypes);
- } else {
+ }
+ else {
getWideQueryHeaders(plan, respColumns, columnsTypes);
resp.setColumns(respColumns);
resp.setDataTypeList(columnsTypes);
@@ -724,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// wide means not group by device
private void getWideQueryHeaders(QueryPlan plan, List<String> respColumns,
- List<String> columnTypes) throws TException, QueryProcessException {
+ List<String> columnTypes) throws TException, QueryProcessException {
// Restore column header of aggregate to func(column_name), only
// support single aggregate function for now
List<Path> paths = plan.getPaths();
@@ -757,7 +736,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private void getGroupByDeviceQueryHeaders(QueryPlan plan, List<String> respColumns,
- List<String> columnTypes) {
+ List<String> columnTypes) {
// set columns in TSExecuteStatementResp. Note this is without deduplication.
List<String> measurementColumns = plan.getMeasurements();
respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME);
@@ -802,23 +781,45 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (!queryId2DataSet.containsKey(req.queryId)) {
return getTSFetchResultsResp(
- getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
+ getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
}
QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
- TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
- sessionIdUsernameMap.get(req.sessionId));
-
- boolean hasResultSet = result.bufferForTime().limit() != 0;
- if (!hasResultSet) {
- queryId2DataSet.remove(req.queryId);
+ if (req.isAlign) {
+ TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
+ sessionIdUsernameMap.get(req.sessionId));
+ boolean hasResultSet = result.bufferForTime().limit() != 0;
+ if (!hasResultSet) {
+ QueryResourceManager.getInstance().endQuery(req.queryId);
+ queryId2DataSet.remove(req.queryId);
+ }
+ TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
+ "FetchResult successfully. Has more result: " + hasResultSet));
+ resp.setHasResultSet(hasResultSet);
+ resp.setQueryDataSet(result);
+ resp.setIsAlign(true);
+ return resp;
+ }
+ else {
+ TSQueryNonAlignDataSet nonAlignResult = fillRpcNonAlignReturnData(req.fetchSize, queryDataSet,
+ sessionIdUsernameMap.get(req.sessionId));
+ boolean hasResultSet = false;
+ for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
+ if (timeBuffer.limit() != 0) {
+ hasResultSet = true;
+ break;
+ }
+ }
+ if (!hasResultSet) {
+ queryId2DataSet.remove(req.queryId);
+ }
+ TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
+ "FetchResult successfully. Has more result: " + hasResultSet));
+ resp.setHasResultSet(hasResultSet);
+ resp.setNonAlignQueryDataSet(nonAlignResult);
+ resp.setIsAlign(false);
+ return resp;
}
-
- TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
- "FetchResult successfully. Has more result: " + hasResultSet));
- resp.setHasResultSet(hasResultSet);
- resp.setQueryDataSet(result);
- return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSFetchResultsResp(getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
@@ -841,7 +842,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
encoder = new GroupedLSBWatermarkEncoder(config);
} else {
throw new UnSupportedDataTypeException(String.format(
- "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+ "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
}
if (queryDataSet instanceof NewEngineDataSetWithoutValueFilter) {
// optimize for query without value filter
@@ -860,11 +861,39 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return result;
}
+ private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, QueryDataSet queryDataSet,
+ String userName) throws TException, AuthException, InterruptedException {
+ IAuthorizer authorizer;
+ try {
+ authorizer = LocalFileAuthorizer.getInstance();
+ } catch (AuthException e) {
+ throw new TException(e);
+ }
+ TSQueryNonAlignDataSet result;
+
+ if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
+ WatermarkEncoder encoder;
+ if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
+ encoder = new GroupedLSBWatermarkEncoder(config);
+ } else {
+ throw new UnSupportedDataTypeException(String.format(
+ "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+ }
+ result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
+ } else {
+ result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, null);
+ }
+ return result;
+
+ }
+
+
+
/**
* create QueryDataSet and buffer it for fetchResults
*/
private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws
- QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
+ QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
QueryContext context = new QueryContext(queryId);
QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context);
@@ -884,7 +913,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSExecuteStatementResp(
- getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+ getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
@@ -904,7 +933,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new QueryProcessException(
- "Current system mode is read-only, does not support non-query operation");
+ "Current system mode is read-only, does not support non-query operation");
}
return processor.getExecutor().processNonQuery(plan);
}
@@ -921,7 +950,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (physicalPlan.isQuery()) {
return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Statement is a query statement."));
+ "Statement is a query statement."));
}
return executeUpdateStatement(physicalPlan, sessionId);
@@ -937,7 +966,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan, String username)
- throws AuthException {
+ throws AuthException {
String targetUser = null;
if (plan instanceof AuthorPlan) {
targetUser = ((AuthorPlan) plan).getUserName();
@@ -953,7 +982,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private TSExecuteBatchStatementResp getTSBatchExecuteStatementResp(TSStatus status,
- List<Integer> result) {
+ List<Integer> result) {
TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
TSStatus tsStatus = new TSStatus(status);
resp.setStatus(tsStatus);
@@ -1013,7 +1042,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
properties
- .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+ .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
return properties;
}
@@ -1122,7 +1151,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
BatchInsertPlan batchInsertPlan = new BatchInsertPlan(req.deviceId, req.measurements);
batchInsertPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size));
batchInsertPlan.setColumns(QueryDataSetUtils
- .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size));
+ .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size));
batchInsertPlan.setRowCount(req.size);
batchInsertPlan.setDataTypes(req.types);
@@ -1143,16 +1172,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (isAllSuccessful) {
logger.debug("Insert one RowBatch successfully");
return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS),
- Arrays.asList(results));
+ Arrays.asList(results));
} else {
logger.debug("Insert one RowBatch failed!");
return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.INTERNAL_SERVER_ERROR),
- Arrays.asList(results));
+ Arrays.asList(results));
}
} catch (Exception e) {
logger.info("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSBatchExecuteStatementResp(
- getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null);
+ getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null);
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
}
@@ -1198,8 +1227,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(req.getPath()),
- TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()],
- CompressionType.values()[req.compressor], new HashMap<>());
+ TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()],
+ CompressionType.values()[req.compressor], new HashMap<>());
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
return new TSStatus(status);
@@ -1237,7 +1266,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
try {
if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
return getStatus(TSStatusCode.NO_PERMISSION_ERROR,
- "No permissions for this operation " + plan.getOperatorType().toString());
+ "No permissions for this operation " + plan.getOperatorType().toString());
}
} catch (AuthException e) {
logger.error("meet error while checking authorization.", e);
@@ -1256,7 +1285,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
- : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
private long generateQueryId(boolean isDataQuery) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 8fa3f43..4209ba3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -175,7 +175,6 @@ public class QueryDataSetUtils {
return tsQueryDataSet;
}
-
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
long[] times = new long[size];
for (int i = 0; i < size; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDisableAlignIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDisableAlignIT.java
new file mode 100644
index 0000000..dc85e7a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDisableAlignIT.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
+public class IoTDBDisableAlignIT {
+
+ private static IoTDB daemon;
+ private static String[] sqls = new String[]{
+
+ "SET STORAGE GROUP TO root.vehicle",
+ "SET STORAGE GROUP TO root.other",
+
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+
+ "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+
+ "CREATE TIMESERIES root.other.d1.s0 WITH DATATYPE=FLOAT, ENCODING=RLE",
+
+ "insert into root.vehicle.d0(timestamp,s0) values(1,101)",
+ "insert into root.vehicle.d0(timestamp,s0) values(2,198)",
+ "insert into root.vehicle.d0(timestamp,s0) values(100,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(101,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(102,80)",
+ "insert into root.vehicle.d0(timestamp,s0) values(103,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(104,90)",
+ "insert into root.vehicle.d0(timestamp,s0) values(105,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(106,99)",
+ "insert into root.vehicle.d0(timestamp,s0) values(2,10000)",
+ "insert into root.vehicle.d0(timestamp,s0) values(50,10000)",
+ "insert into root.vehicle.d0(timestamp,s0) values(1000,22222)",
+
+ "insert into root.vehicle.d0(timestamp,s1) values(1,1101)",
+ "insert into root.vehicle.d0(timestamp,s1) values(2,198)",
+ "insert into root.vehicle.d0(timestamp,s1) values(100,199)",
+ "insert into root.vehicle.d0(timestamp,s1) values(101,199)",
+ "insert into root.vehicle.d0(timestamp,s1) values(102,180)",
+ "insert into root.vehicle.d0(timestamp,s1) values(103,199)",
+ "insert into root.vehicle.d0(timestamp,s1) values(104,190)",
+ "insert into root.vehicle.d0(timestamp,s1) values(105,199)",
+ "insert into root.vehicle.d0(timestamp,s1) values(2,40000)",
+ "insert into root.vehicle.d0(timestamp,s1) values(50,50000)",
+ "insert into root.vehicle.d0(timestamp,s1) values(1000,55555)",
+
+ "insert into root.vehicle.d0(timestamp,s2) values(1000,55555)",
+ "insert into root.vehicle.d0(timestamp,s2) values(2,2.22)",
+ "insert into root.vehicle.d0(timestamp,s2) values(3,3.33)",
+ "insert into root.vehicle.d0(timestamp,s2) values(4,4.44)",
+ "insert into root.vehicle.d0(timestamp,s2) values(102,10.00)",
+ "insert into root.vehicle.d0(timestamp,s2) values(105,11.11)",
+ "insert into root.vehicle.d0(timestamp,s2) values(1000,1000.11)",
+
+ "insert into root.vehicle.d0(timestamp,s3) values(60,'aaaaa')",
+ "insert into root.vehicle.d0(timestamp,s3) values(70,'bbbbb')",
+ "insert into root.vehicle.d0(timestamp,s3) values(80,'ccccc')",
+ "insert into root.vehicle.d0(timestamp,s3) values(101,'ddddd')",
+ "insert into root.vehicle.d0(timestamp,s3) values(102,'fffff')",
+
+ "insert into root.vehicle.d1(timestamp,s0) values(1,999)",
+ "insert into root.vehicle.d1(timestamp,s0) values(1000,888)",
+
+ "insert into root.vehicle.d0(timestamp,s1) values(2000-01-01T08:00:00+08:00, 100)",
+ "insert into root.vehicle.d0(timestamp,s3) values(2000-01-01T08:00:00+08:00, 'good')",
+
+ "insert into root.vehicle.d0(timestamp,s4) values(100, false)",
+ "insert into root.vehicle.d0(timestamp,s4) values(100, true)",
+
+ "insert into root.other.d1(timestamp,s0) values(2, 3.14)",};
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+
+ insertData();
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ daemon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ private static void insertData() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void selectTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "1,101,1,1101,2,2.22,60,aaaaa,100,true,1,999,",
+ "2,10000,2,40000,3,3.33,70,bbbbb,null,null,1000,888,",
+ "50,10000,50,50000,4,4.44,80,ccccc,null,null,null,null,",
+ "100,99,100,199,102,10.0,101,ddddd,null,null,null,null,",
+ "101,99,101,199,105,11.11,102,fffff,null,null,null,null,",
+ "102,80,102,180,1000,1000.11,946684800000,good,null,null,null,null,",
+ "103,99,103,199,null,null,null,null,null,null,null,null,",
+ "104,90,104,190,null,null,null,null,null,null,null,null,",
+ "105,99,105,199,null,null,null,null,null,null,null,null,",
+ "106,99,1000,55555,null,null,null,null,null,null,null,null,",
+ "1000,22222,946684800000,100,null,null,null,null,null,null,null,null,",
+ };
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select * from root.vehicle disable align");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder header = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ header.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ Assert.assertEquals(
+ "Timeroot.vehicle.d0.s0,root.vehicle.d0.s0,"
+ + "Timeroot.vehicle.d0.s1,root.vehicle.d0.s1,"
+ + "Timeroot.vehicle.d0.s2,root.vehicle.d0.s2,"
+ + "Timeroot.vehicle.d0.s3,root.vehicle.d0.s3,"
+ + "Timeroot.vehicle.d0.s4,root.vehicle.d0.s4,"
+ + "Timeroot.vehicle.d1.s0,root.vehicle.d1.s0,", header.toString());
+ Assert.assertEquals(Types.TIMESTAMP, resultSetMetaData.getColumnType(1));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(2));
+ Assert.assertEquals(Types.BIGINT, resultSetMetaData.getColumnType(3));
+ Assert.assertEquals(Types.FLOAT, resultSetMetaData.getColumnType(4));
+ Assert.assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(5));
+ Assert.assertEquals(Types.BOOLEAN, resultSetMetaData.getColumnType(6));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(7));
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(11, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectWithDuplicatedPathsTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "1,101,1,101,1,1101,",
+ "2,10000,2,10000,2,40000,",
+ "50,10000,50,10000,50,50000,",
+ "100,99,100,99,100,199,",
+ "101,99,101,99,101,199,",
+ "102,80,102,80,102,180,",
+ "103,99,103,99,103,199,",
+ "104,90,104,90,104,190,",
+ "105,99,105,99,105,199,",
+ "106,99,106,99,1000,55555,",
+ "1000,22222,1000,22222,946684800000,100,",
+ };
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select s0,s0,s1 from root.vehicle.d0 disable align");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder header = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ header.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ Assert.assertEquals("Timeroot.vehicle.d0.s0,root.vehicle.d0.s0,"
+ + "Timeroot.vehicle.d0.s0,root.vehicle.d0.s0,"
+ + "Timeroot.vehicle.d0.s1,root.vehicle.d0.s1,", header.toString());
+ Assert.assertEquals(Types.TIMESTAMP, resultSetMetaData.getColumnType(1));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(2));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(3));
+ Assert.assertEquals(Types.BIGINT, resultSetMetaData.getColumnType(4));
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(11, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectLimitTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "2,10000,1000,888,2,40000,3,3.33,70,bbbbb,null,null,",
+ "50,10000,null,null,50,50000,4,4.44,80,ccccc,null,null,",
+ "100,99,null,null,100,199,102,10.0,101,ddddd,null,null,",
+ "101,99,null,null,101,199,105,11.11,102,fffff,null,null,",
+ "102,80,null,null,102,180,1000,1000.11,946684800000,good,null,null,",
+ "103,99,null,null,103,199,null,null,null,null,null,null,",
+ "104,90,null,null,104,190,null,null,null,null,null,null,",
+ "105,99,null,null,105,199,null,null,null,null,null,null,",
+ "106,99,null,null,1000,55555,null,null,null,null,null,null,",
+ "1000,22222,null,null,946684800000,100,null,null,null,null,null,null,",
+ };
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select s0,s1,s2,s3,s4 from root.vehicle.* limit 10 offset 1 disable align");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder header = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ header.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ Assert.assertEquals(
+ "Timeroot.vehicle.d0.s0,root.vehicle.d0.s0,"
+ + "Timeroot.vehicle.d1.s0,root.vehicle.d1.s0,"
+ + "Timeroot.vehicle.d0.s1,root.vehicle.d0.s1,"
+ + "Timeroot.vehicle.d0.s2,root.vehicle.d0.s2,"
+ + "Timeroot.vehicle.d0.s3,root.vehicle.d0.s3,"
+ + "Timeroot.vehicle.d0.s4,root.vehicle.d0.s4,", header.toString());
+ Assert.assertEquals(Types.TIMESTAMP, resultSetMetaData.getColumnType(1));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(2));
+ Assert.assertEquals(Types.INTEGER, resultSetMetaData.getColumnType(3));
+ Assert.assertEquals(Types.BIGINT, resultSetMetaData.getColumnType(4));
+ Assert.assertEquals(Types.FLOAT, resultSetMetaData.getColumnType(5));
+ Assert.assertEquals(Types.VARCHAR, resultSetMetaData.getColumnType(6));
+ Assert.assertEquals(Types.BOOLEAN, resultSetMetaData.getColumnType(7));
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(10, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectSlimitTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "1,1101,2,2.22,",
+ "2,40000,3,3.33,",
+ "50,50000,4,4.44,",
+ "100,199,102,10.0,",
+ "101,199,105,11.11,",
+ "102,180,1000,1000.11,",
+ "103,199,null,null,",
+ "104,190,null,null,",
+ "105,199,null,null,",
+ "1000,55555,null,null,",
+ "946684800000,100,null,null,",
+ };
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select * from root.vehicle.* slimit 2 soffset 1 disable align");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder header = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ header.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ Assert.assertEquals(
+ "Timeroot.vehicle.d0.s1,root.vehicle.d0.s1,"
+ + "Timeroot.vehicle.d0.s2,root.vehicle.d0.s2,", header.toString());
+ Assert.assertEquals(Types.TIMESTAMP, resultSetMetaData.getColumnType(1));
+ Assert.assertEquals(Types.BIGINT, resultSetMetaData.getColumnType(2));
+ Assert.assertEquals(Types.FLOAT, resultSetMetaData.getColumnType(3));
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(11, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void errorCaseTest1() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select * from root.vehicle where time = 3 Fill(int32[previous, 5ms]) disable align");
+ fail("No exception thrown.");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Fill doesn't support disable align clause."));
+ }
+ }
+
+ @Test
+ public void errorCaseTest2() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select count(*) from root.vehicle GROUP BY ([2,50],20ms) disable align");
+ fail("No exception thrown.");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Group by doesn't support disable align clause."));
+ }
+ }
+
+ @Test
+ public void errorCaseTest3() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(
+ "select count(*) from root disable align");
+ fail("No exception thrown.");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Aggregation doesn't support disable align clause."));
+ }
+ }
+
+}
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index aca2625..971df2c 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -107,9 +107,11 @@ Last Updated on November 12th, 2019 by Tian Jiang.
| Latest Changes | Related Committers |
| ------------------------------------------------------------ | ---------------------------------- |
| Add parameter sessionId in getTimeZone, getProperties, setStorageGroup, createTimeseries... | Tian Jiang|
+| Add struct TSQueryNonAlignDataSet | Haonan Hou|
## 3. Update
| Latest Changes | Related Committers |
| ------------------------------------------------------------ | ---------------------- |
-| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds | Tian Jiang |
\ No newline at end of file
+| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds | Tian Jiang |
+| Add optional TSQueryNonAlignDataSet in TSExecuteStatementResp, TSFetchResultsResp and required bool isAlign in TSFetchResultsReq | Haonan Hou |
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index f68f75d..c47ad38 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -41,6 +41,8 @@ struct TSExecuteStatementResp {
// Data type list of columns in select statement of SQL
6: optional list<string> dataTypeList
7: optional TSQueryDataSet queryDataSet
+ // for disable align statements, queryDataSet is null and nonAlignQueryDataSet is not null
+ 8: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
}
enum TSProtocolVersion {
@@ -140,12 +142,15 @@ struct TSFetchResultsReq{
2: required string statement
3: required i32 fetchSize
4: required i64 queryId
+ 5: required bool isAlign
}
struct TSFetchResultsResp{
1: required TSStatus status
2: required bool hasResultSet
- 3: optional TSQueryDataSet queryDataSet
+ 3: required bool isAlign
+ 4: optional TSQueryDataSet queryDataSet
+ 5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
}
struct TSFetchMetadataResp{
@@ -219,12 +224,19 @@ struct ServerProperties {
}
struct TSQueryDataSet{
- // ByteBuffer for time column
- 1: required binary time
- // ByteBuffer for each column values
- 2: required list<binary> valueList
- // Bitmap for each column to indicate whether it is a null value
- 3: required list<binary> bitmapList
+ // ByteBuffer for time column
+ 1: required binary time
+ // ByteBuffer for each column values
+ 2: required list<binary> valueList
+ // Bitmap for each column to indicate whether it is a null value
+ 3: required list<binary> bitmapList
+}
+
+struct TSQueryNonAlignDataSet{
+ // ByteBuffer for each time column
+ 1: required list<binary> timeList
+ // ByteBuffer for each column values
+ 2: required list<binary> valueList
}
diff --git a/session/pom.xml b/session/pom.xml
index 8df9ae6..dab30aa 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -19,134 +19,132 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>iotdb-parent</artifactId>
- <groupId>org.apache.iotdb</groupId>
- <version>0.10.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>iotdb-session</artifactId>
- <name>IoTDB Session</name>
- <properties>
- <session.test.skip>false</session.test.skip>
- <session.it.skip>${session.test.skip}</session.it.skip>
- <session.ut.skip>${session.test.skip}</session.ut.skip>
- </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <!-- this is used for inheritance merges -->
- <phase>package</phase>
- <!-- bind to the packaging phase -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!--using `mvn test` to run UT, `mvn verify` to run ITs
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>iotdb-session</artifactId>
+ <name>IoTDB Session</name>
+ <properties>
+ <session.test.skip>false</session.test.skip>
+ <session.it.skip>${session.test.skip}</session.it.skip>
+ <session.ut.skip>${session.test.skip}</session.ut.skip>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!--using `mvn test` to run UT, `mvn verify` to run ITs
Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>${session.ut.skip}</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <id>run-integration-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <skipTests>${session.test.skip}</skipTests>
- <skipITs>${session.it.skip}</skipITs>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>service-rpc</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>tsfile</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-jdbc</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- </dependencies>
- <profiles>
- <profile>
- <id>skipSessionTests</id>
- <activation>
- <property>
- <name>skipTests</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.test.skip>true</session.test.skip>
- <session.ut.skip>true</session.ut.skip>
- <session.it.skip>true</session.it.skip>
- </properties>
- </profile>
- <profile>
- <id>skipUT_SessionTests</id>
- <activation>
- <property>
- <name>skipUTs</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.ut.skip>true</session.ut.skip>
- </properties>
- </profile>
- </profiles>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${session.ut.skip}</skipTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <skipTests>${session.test.skip}</skipTests>
+ <skipITs>${session.it.skip}</skipITs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>skipSessionTests</id>
+ <activation>
+ <property>
+ <name>skipTests</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.test.skip>true</session.test.skip>
+ <session.ut.skip>true</session.ut.skip>
+ <session.it.skip>true</session.it.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>skipUT_SessionTests</id>
+ <activation>
+ <property>
+ <name>skipUTs</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.ut.skip>true</session.ut.skip>
+ </properties>
+ </profile>
+ </profiles>
</project>
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 14cb38d..5a2bae2 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -87,7 +87,7 @@ public class SessionDataSet {
if (hasCachedRecord)
return true;
if (tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) {
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize, queryId);
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize, queryId, true);
try {
TSFetchResultsResp resp = client.fetchResults(req);
RpcUtils.verifySuccess(resp.getStatus());