You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/29 09:03:46 UTC
[iotdb] branch master updated: [IOTDB-3936]Add an interface in IClientRPCService to directly return bytebuffer instead of TSQueryDataSet
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ca857db14 [IOTDB-3936]Add an interface in IClientRPCService to directly return bytebuffer instead of TSQueryDataSet
2ca857db14 is described below
commit 2ca857db14025c589cdcb0e92aa1300e6e98330c
Author: YangCaiyin <51...@users.noreply.github.com>
AuthorDate: Sat Oct 29 17:03:41 2022 +0800
[IOTDB-3936]Add an interface in IClientRPCService to directly return bytebuffer instead of TSQueryDataSet
---
.../java/org/apache/iotdb/cli/AbstractCli.java | 9 +-
.../iotdb/db/integration/IoTDBTracingIT.java | 4 +-
.../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 843 ++++++++------
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 1163 +++++++++++++++++++-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 26 +-
.../iotdb/jdbc/IoTDBDatabaseMetadataTest.java | 4 +-
.../apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java | 77 +-
.../iotdb/jdbc/IoTDBPreparedStatementTest.java | 34 +-
.../db/mpp/execution/exchange/ISourceHandle.java | 10 +
.../mpp/execution/exchange/LocalSourceHandle.java | 21 +
.../db/mpp/execution/exchange/SourceHandle.java | 22 +-
.../db/mpp/plan/execution/IQueryExecution.java | 3 +
.../db/mpp/plan/execution/QueryExecution.java | 33 +-
.../mpp/plan/execution/config/ConfigExecution.java | 18 +
.../plan/execution/memory/MemorySourceHandle.java | 22 +
.../service/thrift/impl/ClientRPCServiceImpl.java | 450 ++++----
.../db/service/thrift/impl/TSServiceImpl.java | 32 +
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 27 +
service-rpc/pom.xml | 8 +
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 345 +++---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../apache/iotdb/session/SessionConnection.java | 22 +-
.../org/apache/iotdb/session/SessionDataSet.java | 29 +-
thrift/src/main/thrift/client.thrift | 20 +-
24 files changed, 2440 insertions(+), 783 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 309a5bc85c..09dd0d9b3c 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cli;
import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.jdbc.AbstractIoTDBJDBCResultSet;
import org.apache.iotdb.jdbc.IoTDBConnection;
import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -555,7 +554,7 @@ public abstract class AbstractCli {
}
}
// output tracing activity
- if (((AbstractIoTDBJDBCResultSet) resultSet).isSetTracingInfo()) {
+ if (((IoTDBJDBCResultSet) resultSet).isSetTracingInfo()) {
maxSizeList = new ArrayList<>(2);
lists = cacheTracingInfo(resultSet, maxSizeList);
outputTracingInfo(lists, maxSizeList);
@@ -682,8 +681,8 @@ public abstract class AbstractCli {
maxSizeList.add(0, ACTIVITY_STR.length());
maxSizeList.add(1, ELAPSED_TIME_STR.length());
- List<String> activityList = ((AbstractIoTDBJDBCResultSet) resultSet).getActivityList();
- List<Long> elapsedTimeList = ((AbstractIoTDBJDBCResultSet) resultSet).getElapsedTimeList();
+ List<String> activityList = ((IoTDBJDBCResultSet) resultSet).getActivityList();
+ List<Long> elapsedTimeList = ((IoTDBJDBCResultSet) resultSet).getElapsedTimeList();
String[] statisticsInfoList = {
"seriesPathNum", "seqFileNum", "unSeqFileNum", "seqChunkInfo", "unSeqChunkInfo", "pageNumInfo"
};
@@ -693,7 +692,7 @@ public abstract class AbstractCli {
if (i == activityList.size() - 1) {
// cache Statistics
for (String infoName : statisticsInfoList) {
- String info = ((AbstractIoTDBJDBCResultSet) resultSet).getStatisticsInfoByName(infoName);
+ String info = ((IoTDBJDBCResultSet) resultSet).getStatisticsInfoByName(infoName);
lists.get(0).add(info);
lists.get(1).add("");
if (info.length() > maxSizeList.get(0)) {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTracingIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTracingIT.java
index 98dadd7180..7743d60014 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTracingIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTracingIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.integration;
import org.apache.iotdb.integration.env.EnvFactory;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.jdbc.AbstractIoTDBJDBCResultSet;
+import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
import org.junit.After;
import org.junit.Assert;
@@ -68,7 +68,7 @@ public class IoTDBTracingIT {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("tracing select s1 from root.sg_tracing.d1");
- AbstractIoTDBJDBCResultSet resultSet = (AbstractIoTDBJDBCResultSet) statement.getResultSet();
+ IoTDBJDBCResultSet resultSet = (IoTDBJDBCResultSet) statement.getResultSet();
Assert.assertTrue(resultSet.isSetTracingInfo());
Assert.assertEquals(1, resultSet.getStatisticsByName("seriesPathNum"));
Assert.assertEquals(1, resultSet.getStatisticsByName("seqFileNum"));
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index 3f65829054..f900c32c99 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -47,6 +50,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -69,6 +73,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
private long sessionId;
private WatermarkEncoder groupedLSBWatermarkEncoder;
private static String sqlKeywordsThatArentSQL92;
+ private static TsBlockSerde serde = new TsBlockSerde();
IoTDBDatabaseMetadata(
IoTDBConnection connection, IClientRPCService.Iface client, long sessionId) {
@@ -219,47 +224,241 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
"WATERMARK_EMBEDDING"
};
String[] sql92Keywords = {
- "ABSOLUTE", "EXEC", "OVERLAPS", "ACTION", "EXECUTE", "PAD", "ADA", "EXISTS", "PARTIAL", "ADD",
- "EXTERNAL", "PASCAL", "ALL", "EXTRACT", "POSITION", "ALLOCATE", "FALSE", "PRECISION", "ALTER",
- "FETCH",
- "PREPARE", "AND", "FIRST", "PRESERVE", "ANY", "FLOAT", "PRIMARY", "ARE", "FOR", "PRIOR",
- "AS", "FOREIGN", "PRIVILEGES", "ASC", "FORTRAN", "PROCEDURE", "ASSERTION", "FOUND", "PUBLIC",
- "AT",
- "FROM", "READ", "AUTHORIZATION", "FULL", "REAL", "AVG", "GET", "REFERENCES", "BEGIN",
- "GLOBAL",
- "RELATIVE", "BETWEEN", "GO", "RESTRICT", "BIT", "GOTO", "REVOKE", "BIT_LENGTH", "GRANT",
- "RIGHT",
- "BOTH", "GROUP", "ROLLBACK", "BY", "HAVING", "ROWS", "CASCADE", "HOUR", "SCHEMA", "CASCADED",
- "IDENTITY", "SCROLL", "CASE", "IMMEDIATE", "SECOND", "CAST", "IN", "SECTION", "CATALOG",
- "INCLUDE",
- "SELECT", "CHAR", "INDEX", "SESSION", "CHAR_LENGTH", "INDICATOR", "SESSION_USER", "CHARACTER",
- "INITIALLY", "SET",
- "CHARACTER_LENGTH", "INNER", "SIZE", "CHECK", "INPUT", "SMALLINT", "CLOSE", "INSENSITIVE",
- "SOME", "COALESCE",
- "INSERT", "SPACE", "COLLATE", "INT", "SQL", "COLLATION", "INTEGER", "SQLCA", "COLUMN",
- "INTERSECT",
- "SQLCODE", "COMMIT", "INTERVAL", "SQLERROR", "CONNECT", "INTO", "SQLSTATE", "CONNECTION",
- "IS", "SQLWARNING",
- "CONSTRAINT", "ISOLATION", "SUBSTRING", "CONSTRAINTS", "JOIN", "SUM", "CONTINUE", "KEY",
- "SYSTEM_USER", "CONVERT",
- "LANGUAGE", "TABLE", "CORRESPONDING", "LAST", "TEMPORARY", "COUNT", "LEADING", "THEN",
- "CREATE", "LEFT",
- "TIME", "CROSS", "LEVEL", "TIMESTAMP", "CURRENT", "LIKE", "TIMEZONE_HOUR", "CURRENT_DATE",
- "LOCAL", "TIMEZONE_MINUTE",
- "CURRENT_TIME", "LOWER", "TO", "CURRENT_TIMESTAMP", "MATCH", "TRAILING", "CURRENT_USER",
- "MAX", "TRANSACTION", "CURSOR",
- "MIN", "TRANSLATE", "DATE", "MINUTE", "TRANSLATION", "DAY", "MODULE", "TRIM", "DEALLOCATE",
- "MONTH",
- "TRUE", "DEC", "NAMES", "UNION", "DECIMAL", "NATIONAL", "UNIQUE", "DECLARE", "NATURAL",
- "UNKNOWN",
- "DEFAULT", "NCHAR", "UPDATE", "DEFERRABLE", "NEXT", "UPPER", "DEFERRED", "NO", "USAGE",
- "DELETE",
- "NONE", "USER", "DESC", "NOT", "USING", "DESCRIBE", "NULL", "VALUE", "DESCRIPTOR", "NULLIF",
- "VALUES", "DIAGNOSTICS", "NUMERIC", "VARCHAR", "DISCONNECT", "OCTET_LENGTH", "VARYING",
- "DISTINCT", "OF", "VIEW",
- "DOMAIN", "ON", "WHEN", "DOUBLE", "ONLY", "WHENEVER", "DROP", "OPEN", "WHERE", "ELSE",
- "OPTION", "WITH", "END", "OR", "WORK", "END-EXEC", "ORDER", "WRITE", "ESCAPE", "OUTER",
- "YEAR", "EXCEPT", "OUTPUT", "ZONE", "EXCEPTION"
+ "ABSOLUTE",
+ "EXEC",
+ "OVERLAPS",
+ "ACTION",
+ "EXECUTE",
+ "PAD",
+ "ADA",
+ "EXISTS",
+ "PARTIAL",
+ "ADD",
+ "EXTERNAL",
+ "PASCAL",
+ "ALL",
+ "EXTRACT",
+ "POSITION",
+ "ALLOCATE",
+ "FALSE",
+ "PRECISION",
+ "ALTER",
+ "FETCH",
+ "PREPARE",
+ "AND",
+ "FIRST",
+ "PRESERVE",
+ "ANY",
+ "FLOAT",
+ "PRIMARY",
+ "ARE",
+ "FOR",
+ "PRIOR",
+ "AS",
+ "FOREIGN",
+ "PRIVILEGES",
+ "ASC",
+ "FORTRAN",
+ "PROCEDURE",
+ "ASSERTION",
+ "FOUND",
+ "PUBLIC",
+ "AT",
+ "FROM",
+ "READ",
+ "AUTHORIZATION",
+ "FULL",
+ "REAL",
+ "AVG",
+ "GET",
+ "REFERENCES",
+ "BEGIN",
+ "GLOBAL",
+ "RELATIVE",
+ "BETWEEN",
+ "GO",
+ "RESTRICT",
+ "BIT",
+ "GOTO",
+ "REVOKE",
+ "BIT_LENGTH",
+ "GRANT",
+ "RIGHT",
+ "BOTH",
+ "GROUP",
+ "ROLLBACK",
+ "BY",
+ "HAVING",
+ "ROWS",
+ "CASCADE",
+ "HOUR",
+ "SCHEMA",
+ "CASCADED",
+ "IDENTITY",
+ "SCROLL",
+ "CASE",
+ "IMMEDIATE",
+ "SECOND",
+ "CAST",
+ "IN",
+ "SECTION",
+ "CATALOG",
+ "INCLUDE",
+ "SELECT",
+ "CHAR",
+ "INDEX",
+ "SESSION",
+ "CHAR_LENGTH",
+ "INDICATOR",
+ "SESSION_USER",
+ "CHARACTER",
+ "INITIALLY",
+ "SET",
+ "CHARACTER_LENGTH",
+ "INNER",
+ "SIZE",
+ "CHECK",
+ "INPUT",
+ "SMALLINT",
+ "CLOSE",
+ "INSENSITIVE",
+ "SOME",
+ "COALESCE",
+ "INSERT",
+ "SPACE",
+ "COLLATE",
+ "INT",
+ "SQL",
+ "COLLATION",
+ "INTEGER",
+ "SQLCA",
+ "COLUMN",
+ "INTERSECT",
+ "SQLCODE",
+ "COMMIT",
+ "INTERVAL",
+ "SQLERROR",
+ "CONNECT",
+ "INTO",
+ "SQLSTATE",
+ "CONNECTION",
+ "IS",
+ "SQLWARNING",
+ "CONSTRAINT",
+ "ISOLATION",
+ "SUBSTRING",
+ "CONSTRAINTS",
+ "JOIN",
+ "SUM",
+ "CONTINUE",
+ "KEY",
+ "SYSTEM_USER",
+ "CONVERT",
+ "LANGUAGE",
+ "TABLE",
+ "CORRESPONDING",
+ "LAST",
+ "TEMPORARY",
+ "COUNT",
+ "LEADING",
+ "THEN",
+ "CREATE",
+ "LEFT",
+ "TIME",
+ "CROSS",
+ "LEVEL",
+ "TIMESTAMP",
+ "CURRENT",
+ "LIKE",
+ "TIMEZONE_HOUR",
+ "CURRENT_DATE",
+ "LOCAL",
+ "TIMEZONE_MINUTE",
+ "CURRENT_TIME",
+ "LOWER",
+ "TO",
+ "CURRENT_TIMESTAMP",
+ "MATCH",
+ "TRAILING",
+ "CURRENT_USER",
+ "MAX",
+ "TRANSACTION",
+ "CURSOR",
+ "MIN",
+ "TRANSLATE",
+ "DATE",
+ "MINUTE",
+ "TRANSLATION",
+ "DAY",
+ "MODULE",
+ "TRIM",
+ "DEALLOCATE",
+ "MONTH",
+ "TRUE",
+ "DEC",
+ "NAMES",
+ "UNION",
+ "DECIMAL",
+ "NATIONAL",
+ "UNIQUE",
+ "DECLARE",
+ "NATURAL",
+ "UNKNOWN",
+ "DEFAULT",
+ "NCHAR",
+ "UPDATE",
+ "DEFERRABLE",
+ "NEXT",
+ "UPPER",
+ "DEFERRED",
+ "NO",
+ "USAGE",
+ "DELETE",
+ "NONE",
+ "USER",
+ "DESC",
+ "NOT",
+ "USING",
+ "DESCRIBE",
+ "NULL",
+ "VALUE",
+ "DESCRIPTOR",
+ "NULLIF",
+ "VALUES",
+ "DIAGNOSTICS",
+ "NUMERIC",
+ "VARCHAR",
+ "DISCONNECT",
+ "OCTET_LENGTH",
+ "VARYING",
+ "DISTINCT",
+ "OF",
+ "VIEW",
+ "DOMAIN",
+ "ON",
+ "WHEN",
+ "DOUBLE",
+ "ONLY",
+ "WHENEVER",
+ "DROP",
+ "OPEN",
+ "WHERE",
+ "ELSE",
+ "OPTION",
+ "WITH",
+ "END",
+ "OR",
+ "WORK",
+ "END-EXEC",
+ "ORDER",
+ "WRITE",
+ "ESCAPE",
+ "OUTER",
+ "YEAR",
+ "EXCEPT",
+ "OUTPUT",
+ "ZONE",
+ "EXCEPTION"
};
TreeMap myKeywordMap = new TreeMap();
for (int i = 0; i < allIotdbSQLKeywords.length; i++) {
@@ -362,9 +561,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getAttributes(String arg0, String arg1, String arg2, String arg3)
throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
Field[] fields = new Field[21];
@@ -418,9 +617,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getBestRowIdentifier(
String arg0, String arg1, String arg2, int arg3, boolean arg4) throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
Field[] fields = new Field[8];
@@ -474,28 +673,26 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
public ResultSet getCatalogs() throws SQLException {
Statement stmt = this.connection.createStatement();
ResultSet rs = stmt.executeQuery("SHOW STORAGE GROUP ");
- Field[] fields = new Field[1];
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigpaths = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<TSDataType> tsDataTypeList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ tsDataTypeList.add(TSDataType.TEXT);
+
while (rs.next()) {
- List<Map> paths = new ArrayList<Map>();
- Map<String, Object> m = new HashMap<>();
- m.put("type", TSDataType.TEXT);
- m.put("val", rs.getString(1));
- paths.add(m);
- bigpaths.add(paths);
- }
- addToDataSet(bigpaths, dataSet);
+ List<Object> values = new ArrayList<>();
+ values.add(rs.getString(1));
+ valuesList.add(values);
+ }
columnNameList.add("TYPE_CAT");
columnTypeList.add("TEXT");
columnNameIndex.put("TYPE_CAT", 0);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -511,7 +708,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -634,6 +831,43 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
return tsQueryDataSet;
}
+ public static ByteBuffer convertTsBlock(
+ List<List<Object>> valuesList, List<TSDataType> tsDataTypeList) throws IOException {
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(tsDataTypeList);
+ for (List<Object> valuesInRow : valuesList) {
+ for (int j = 0; j < tsDataTypeList.size(); j++) {
+ TSDataType columnType = tsDataTypeList.get(j);
+ switch (columnType) {
+ case TEXT:
+ tsBlockBuilder.getColumnBuilder(j).writeBinary((Binary) valuesInRow.get(j));
+ break;
+ case FLOAT:
+ tsBlockBuilder.getColumnBuilder(j).writeFloat((float) valuesInRow.get(j));
+ break;
+ case INT32:
+ tsBlockBuilder.getColumnBuilder(j).writeInt((int) valuesInRow.get(j));
+ break;
+ case INT64:
+ tsBlockBuilder.getColumnBuilder(j).writeLong((long) valuesInRow.get(j));
+ break;
+ case DOUBLE:
+ tsBlockBuilder.getColumnBuilder(j).writeDouble((double) valuesInRow.get(j));
+ break;
+ case BOOLEAN:
+ tsBlockBuilder.getColumnBuilder(j).writeBoolean((boolean) valuesInRow.get(j));
+ break;
+ }
+ }
+ tsBlockBuilder.declarePosition();
+ }
+ TsBlock tsBlock = tsBlockBuilder.build();
+ if (tsBlock == null) {
+ return null;
+ } else {
+ return serde.serialize(tsBlock);
+ }
+ }
+
private void addToDataSet(List<List<Map>> listbigPaths, ListDataSet dataSet) {
List<TSDataType> listType = new ArrayList<>();
int i = 0;
@@ -680,35 +914,31 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
public ResultSet getClientInfoProperties() throws SQLException {
Statement stmt = this.connection.createStatement();
ResultSet rs = stmt.executeQuery("SHOW STORAGE GROUP ");
+
Field[] fields = new Field[4];
fields[0] = new Field("", "NAME", "TEXT");
fields[1] = new Field("", "MAX_LEN", "INT32");
fields[2] = new Field("", "DEFAULT_VALUE", "INT32");
fields[3] = new Field("", "DESCRIPTION", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32, TSDataType.TEXT);
- List<Object> listVal = Arrays.asList("fetch_size", 10, 10, "");
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- List<Map> properties = new ArrayList<Map>();
- ListDataSet dataSet = new ListDataSet();
+ List<Object> values = Arrays.asList("fetch_size", 10, 10, "");
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
+ valuesList.add(values);
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
- m.put("val", listVal.get(i));
- properties.add(m);
- }
- bigproperties.add(properties);
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+ }
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -724,7 +954,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -776,7 +1006,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[5] = new Field("", "GRANTEE", "TEXT");
fields[6] = new Field("", "PRIVILEGE", "TEXT");
fields[7] = new Field("", "IS_GRANTABLE", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -786,42 +1016,38 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valuesInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i < 4) {
- m.put("val", rs.getString(1));
+ valuesInRow.add(rs.getString(1));
} else if (i == 5) {
- m.put("val", getUserName());
+ valuesInRow.add(getUserName());
} else if (i == 6) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 7) {
- m.put("val", "NO");
+ valuesInRow.add("NO");
} else {
- m.put("val", "");
+ valuesInRow.add("");
}
-
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valuesInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -837,7 +1063,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -852,9 +1078,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
public ResultSet getCrossReference(
String arg0, String arg1, String arg2, String arg3, String arg4, String arg5)
throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
Field[] fields = new Field[14];
@@ -1044,7 +1270,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[14] = new Field("", "ORDINAL_POSITION", "INT32");
fields[15] = new Field("", "IS_NULLABLE", "TEXT");
fields[16] = new Field("", "SPECIFIC_NAME", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -1063,37 +1289,34 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.INT32,
TSDataType.TEXT,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valuesInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i == 2) {
- m.put("val", rs.getString(1));
+ valuesInRow.add(rs.getString(1));
} else if ("INT32".equals(fields[i].getSqlType())) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else {
- m.put("val", "");
+ valuesInRow.add("");
}
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valuesInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1109,7 +1332,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1127,7 +1350,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[3] = new Field("", "REMARKS", "TEXT");
fields[4] = new Field("", "FUNCTION_TYPE", "INT32");
fields[5] = new Field("", "SPECIFIC_NAME", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -1135,38 +1358,34 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.INT32,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valueInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i == 2) {
- m.put("val", rs.getString(1));
+ valueInRow.add(rs.getString(1));
} else if (i == 4) {
- m.put("val", 0);
+ valueInRow.add(0);
} else {
- m.put("val", "");
+ valueInRow.add("");
}
-
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valueInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1182,7 +1401,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1442,10 +1661,10 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException {
Statement stmt = connection.createStatement();
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<TSDataType> listType =
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -1453,11 +1672,6 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.INT32,
TSDataType.TEXT);
- List<Object> listValSub_1 = Arrays.asList(catalog, "", table, "time", 1, "PRIMARY");
- List<Object> listValSub_2 = Arrays.asList(catalog, "", table, "deivce", 2, "PRIMARY");
- List<List<Object>> listVal = Arrays.asList(listValSub_1, listValSub_2);
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
Field[] fields = new Field[6];
fields[0] = new Field("", "TABLE_CAT", "TEXT");
fields[1] = new Field("", "TABLE_SCHEM", "TEXT");
@@ -1465,26 +1679,19 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[3] = new Field("", "COLUMN_NAME", "TEXT");
fields[4] = new Field("", "KEY_SEQ", "INT32");
fields[5] = new Field("", "PK_NAME", "TEXT");
+
+ List<Object> listValSub_1 = Arrays.asList(catalog, "", table, "time", 1, "PRIMARY");
+ List<Object> listValSub_2 = Arrays.asList(catalog, "", table, "deivce", 2, "PRIMARY");
+ List<List<Object>> valuesList = Arrays.asList(listValSub_1, listValSub_2);
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
- for (List<Object> listob : listVal) {
- List<Map> properties = new ArrayList<Map>();
- for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
- m.put("val", listob.get(i));
- properties.add(m);
- }
- bigproperties.add(properties);
- }
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1500,7 +1707,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1509,9 +1716,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getProcedureColumns(String arg0, String arg1, String arg2, String arg3)
throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
@@ -1569,9 +1776,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getProcedures(String arg0, String arg1, String arg2) throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
Field[] fields = new Field[6];
@@ -1625,7 +1832,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[9] = new Field("", "REMARKS", "TEXT");
fields[10] = new Field("", "CHAR_OCTET_LENGTH", "INT32");
fields[11] = new Field("", "IS_NULLABLE", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -1639,30 +1846,23 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.INT32,
TSDataType.TEXT);
- List<Object> listVal =
+
+ List<Object> value =
Arrays.asList(
catalog, catalog, tableNamePattern, "times", Types.BIGINT, 1, 0, 2, "", "", 13, "NO");
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- List<Map> properties = new ArrayList<Map>();
- ListDataSet dataSet = new ListDataSet();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
- m.put("val", listVal.get(i));
- properties.add(m);
- }
- bigproperties.add(properties);
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+ }
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(Collections.singletonList(value), tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1678,7 +1878,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1716,32 +1916,29 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
Field[] fields = new Field[2];
fields[0] = new Field("", "TABLE_SCHEM", "TEXT");
fields[1] = new Field("", "TABLE_CATALOG", "TEXT");
- List<TSDataType> listType = Arrays.asList(TSDataType.TEXT, TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<TSDataType> tsDataTypeList = Arrays.asList(TSDataType.TEXT, TSDataType.TEXT);
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valueInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
- m.put("val", rs.getString(1));
- properties.add(m);
+ valueInRow.add(rs.getString(1));
}
- bigproperties.add(properties);
+ valuesList.add(valueInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1757,7 +1954,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1781,8 +1978,8 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern)
throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
Statement stmt = connection.createStatement();
try {
@@ -1820,9 +2017,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern)
throws SQLException {
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
Statement stmt = connection.createStatement();
try {
Field[] fields = new Field[6];
@@ -1919,7 +2116,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[5] = new Field("", "GRANTEE", "TEXT");
fields[6] = new Field("", "PRIVILEGE", "TEXT");
fields[7] = new Field("", "IS_GRANTABLE", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -1929,42 +2126,38 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valueInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i < 4) {
- m.put("val", rs.getString(1));
+ valueInRow.add(rs.getString(1));
} else if (i == 5) {
- m.put("val", getUserName());
+ valueInRow.add(getUserName());
} else if (i == 6) {
- m.put("val", "");
+ valueInRow.add("");
} else if (i == 7) {
- m.put("val", "NO");
+ valueInRow.add("NO");
} else {
- m.put("val", "");
+ valueInRow.add("");
}
-
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valueInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -1980,7 +2173,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -1989,25 +2182,22 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
@Override
public ResultSet getTableTypes() throws SQLException {
Statement stmt = this.connection.createStatement();
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigpaths = new ArrayList<List<Map>>();
- List<Map> paths = new ArrayList<Map>();
- ListDataSet dataSet = new ListDataSet();
- Map<String, Object> m = new HashMap<>();
- m.put("type", TSDataType.TEXT);
- m.put("val", "table");
- paths.add(m);
- bigpaths.add(paths);
- addToDataSet(bigpaths, dataSet);
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<TSDataType> tsDataTypeList = new ArrayList<>();
+ List<Object> value = new ArrayList<>();
+
+ tsDataTypeList.add(TSDataType.TEXT);
+ value.add("table");
columnNameList.add("TABLE_TYPE");
columnTypeList.add("TEXT");
columnNameIndex.put("TABLE_TYPE", 0);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(Collections.singletonList(value), tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -2023,7 +2213,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -2092,7 +2282,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[22] = new Field("", "IS_AUTOINCREMENT", "TEXT");
fields[23] = new Field("", "IS_GENERATEDCOLUMN", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -2118,74 +2308,68 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.INT32,
TSDataType.TEXT,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valuesInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i < 4) {
- m.put("val", rs.getString(1));
+ valuesInRow.add(1);
} else if (i == 4) {
- m.put("val", getSQLType(fields[i].getSqlType()));
+ valuesInRow.add(getSQLType(fields[i].getSqlType()));
} else if (i == 6) {
- m.put("val", getTypePrecision(fields[i].getSqlType()));
+ valuesInRow.add(getTypePrecision(fields[i].getSqlType()));
} else if (i == 7) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else if (i == 8) {
- m.put("val", getTypeScale(fields[i].getSqlType()));
+ valuesInRow.add(getTypeScale(fields[i].getSqlType()));
} else if (i == 9) {
- m.put("val", 10);
+ valuesInRow.add(10);
} else if (i == 10) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else if (i == 11) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 12) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 13) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else if (i == 14) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else if (i == 15) {
- m.put("val", getTypePrecision(fields[i].getSqlType()));
+ valuesInRow.add(getTypePrecision(fields[i].getSqlType()));
} else if (i == 16) {
- m.put("val", 1);
+ valuesInRow.add(1);
} else if (i == 17) {
- m.put("val", "NO");
+ valuesInRow.add("NO");
} else if (i == 18) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 19) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 20) {
- m.put("val", "");
+ valuesInRow.add("");
} else if (i == 21) {
- m.put("val", 0);
+ valuesInRow.add(0);
} else if (i == 22) {
- m.put("val", "NO");
+ valuesInRow.add("NO");
} else if (i == 23) {
- m.put("val", "NO");
+ valuesInRow.add("NO");
} else {
- m.put("val", "");
+ valuesInRow.add("");
}
-
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valuesInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -2201,7 +2385,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -2323,7 +2507,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[8] = new Field("", "SELF_REFERENCING_COL_NAME", "TEXT");
fields[9] = new Field("", "REF_GENERATION", "TEXT");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
@@ -2335,39 +2519,36 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
}
while (rs.next()) {
- List<Map> properties = new ArrayList<Map>();
+ List<Object> valueInRow = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
if (i < 2) {
- m.put("val", rs.getString(3));
+ valueInRow.add(rs.getString(3));
} else if (i == 2) {
- m.put("val", rs.getString(1));
+ valueInRow.add(rs.getString(1));
} else if (i == 3) {
- m.put("val", "TABLE");
+ valueInRow.add("TABLE");
} else {
- m.put("val", "");
+ valueInRow.add("");
}
- properties.add(m);
}
- bigproperties.add(properties);
+ valuesList.add(valueInRow);
}
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
}
@@ -2381,7 +2562,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
@@ -2414,7 +2595,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
fields[15] = new Field("", "SQL_DATA_TYPE", "INT32");
fields[16] = new Field("", "SQL_DATETIME_SUB", "INT32");
fields[17] = new Field("", "NUM_PREC_RADIX", "INT32");
- List<TSDataType> listType =
+ List<TSDataType> tsDataTypeList =
Arrays.asList(
TSDataType.TEXT,
TSDataType.INT32,
@@ -2554,35 +2735,21 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
0,
0,
10);
- List<List<Object>> listVal =
+ List<List<Object>> valuesList =
Arrays.asList(
listValSub_1, listValSub_2, listValSub_3, listValSub_4, listValSub_5, listValSub_6);
- List<String> columnNameList = new ArrayList<String>();
- List<String> columnTypeList = new ArrayList<String>();
- Map<String, Integer> columnNameIndex = new HashMap<String, Integer>();
- List<List<Map>> bigproperties = new ArrayList<List<Map>>();
- ListDataSet dataSet = new ListDataSet();
+ List<String> columnNameList = new ArrayList<>();
+ List<String> columnTypeList = new ArrayList<>();
+ Map<String, Integer> columnNameIndex = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
columnNameList.add(fields[i].getName());
columnTypeList.add(fields[i].getSqlType());
columnNameIndex.put(fields[i].getName(), i);
- Map<String, Object> m = new HashMap<>();
}
- for (List<Object> listob : listVal) {
- List<Map> properties = new ArrayList<Map>();
- for (int i = 0; i < fields.length; i++) {
- Map<String, Object> m = new HashMap<>();
- m.put("type", listType.get(i));
- m.put("val", listob.get(i));
- properties.add(m);
- }
- bigproperties.add(properties);
- }
- addToDataSet(bigproperties, dataSet);
- TSQueryDataSet tsdataset = null;
+
+ ByteBuffer tsBlock = null;
try {
- tsdataset =
- convertQueryDataSetByFetchSize(dataSet, stmt.getFetchSize(), getWatermarkEncoder());
+ tsBlock = convertTsBlock(valuesList, tsDataTypeList);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -2598,7 +2765,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
null,
0,
sessionId,
- tsdataset,
+ Collections.singletonList(tsBlock),
null,
(long) 60 * 1000,
false);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index f79131b634..1222ad128e 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -20,18 +20,48 @@
package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.IoTDBRpcDataSet;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
+import org.apache.thrift.TException;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.BitSet;
+import java.util.Calendar;
import java.util.List;
import java.util.Map;
-public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
+public class IoTDBJDBCResultSet implements ResultSet {
+
+ protected Statement statement;
+ protected SQLWarning warningChain = null;
+ protected List<String> columnTypeList;
+ protected IoTDBRpcDataSet ioTDBRpcDataSet;
+ protected IoTDBTracingInfo ioTDBRpcTracingInfo;
+ private boolean isRpcFetchResult = true;
+
private String operationType = "";
private List<String> columns = null;
private List<String> sgColumns = null;
@@ -46,7 +76,7 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
String sql,
long queryId,
long sessionId,
- TSQueryDataSet dataset,
+ List<ByteBuffer> dataset,
TSTracingInfo tracingInfo,
long timeout,
String operationType,
@@ -54,20 +84,25 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
List<String> sgColumns,
BitSet aliasColumnMap)
throws SQLException {
- super(
- statement,
- columnNameList,
- columnTypeList,
- columnNameIndex,
- ignoreTimeStamp,
- client,
- sql,
- queryId,
- sessionId,
- timeout,
- sgColumns,
- aliasColumnMap);
- ioTDBRpcDataSet.setTsQueryDataSet(dataset);
+ this.ioTDBRpcDataSet =
+ new IoTDBRpcDataSet(
+ sql,
+ columnNameList,
+ columnTypeList,
+ columnNameIndex,
+ ignoreTimeStamp,
+ true,
+ queryId,
+ ((IoTDBStatement) statement).getStmtId(),
+ client,
+ sessionId,
+ dataset,
+ statement.getFetchSize(),
+ timeout,
+ sgColumns,
+ aliasColumnMap);
+ this.statement = statement;
+ this.columnTypeList = columnTypeList;
if (tracingInfo != null) {
ioTDBRpcTracingInfo = new IoTDBTracingInfo();
ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo);
@@ -87,24 +122,29 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
String sql,
long queryId,
long sessionId,
- TSQueryDataSet dataset,
+ List<ByteBuffer> dataSet,
TSTracingInfo tracingInfo,
long timeout,
boolean isRpcFetchResult)
throws SQLException {
- super(
- statement,
- columnNameList,
- columnTypeList,
- columnNameIndex,
- ignoreTimeStamp,
- client,
- sql,
- queryId,
- sessionId,
- timeout,
- isRpcFetchResult);
- ioTDBRpcDataSet.setTsQueryDataSet(dataset);
+ this.ioTDBRpcDataSet =
+ new IoTDBRpcDataSet(
+ sql,
+ columnNameList,
+ columnTypeList,
+ columnNameIndex,
+ ignoreTimeStamp,
+ isRpcFetchResult,
+ queryId,
+ ((IoTDBStatement) statement).getStmtId(),
+ client,
+ sessionId,
+ dataSet,
+ statement.getFetchSize(),
+ timeout);
+ this.statement = statement;
+ this.columnTypeList = columnTypeList;
+ this.isRpcFetchResult = isRpcFetchResult;
if (tracingInfo != null) {
ioTDBRpcTracingInfo = new IoTDBTracingInfo();
ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo);
@@ -112,60 +152,1091 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
}
@Override
- public long getLong(String columnName) throws SQLException {
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean absolute(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void afterLast() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void beforeFirst() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void cancelRowUpdates() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void close() throws SQLException {
try {
- return ioTDBRpcDataSet.getLong(columnName);
+ ioTDBRpcDataSet.close();
+ } catch (StatementExecutionException e) {
+ throw new SQLException("Error occurs for close operation in server side because ", e);
+ } catch (TException e) {
+ throw new SQLException("Error occurs when connecting to server for close operation ", e);
+ }
+ }
+
+ @Override
+ public void deleteRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int findColumn(String columnName) {
+ return ioTDBRpcDataSet.findColumn(columnName);
+ }
+
+ @Override
+ public boolean first() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Array getArray(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Array getArray(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public InputStream getAsciiStream(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public InputStream getAsciiStream(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+ try {
+ return getBigDecimal(ioTDBRpcDataSet.findColumnNameByIndex(columnIndex));
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnName) throws SQLException {
+ String value = getValueByName(columnName);
+ if (value != null) {
+ return new BigDecimal(value);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+ MathContext mc = new MathContext(scale);
+ return getBigDecimal(columnIndex).round(mc);
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnName, int scale) throws SQLException {
+ return getBigDecimal(findColumn(columnName), scale);
+ }
+
+ @Override
+ public InputStream getBinaryStream(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public InputStream getBinaryStream(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Blob getBlob(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Blob getBlob(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getBoolean(columnIndex);
} catch (StatementExecutionException e) {
throw new SQLException(e.getMessage());
}
}
@Override
- protected boolean fetchResults() throws SQLException {
+ public boolean getBoolean(String columnName) throws SQLException {
try {
- return ioTDBRpcDataSet.fetchResults();
- } catch (StatementExecutionException | IoTDBConnectionException e) {
+ return ioTDBRpcDataSet.getBoolean(columnName);
+ } catch (StatementExecutionException e) {
throw new SQLException(e.getMessage());
}
}
@Override
- protected boolean hasCachedResults() {
- return ioTDBRpcDataSet.hasCachedResults();
+ public byte getByte(int columnIndex) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public byte getByte(String columnName) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public byte[] getBytes(int columnIndex) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public byte[] getBytes(String columnName) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Reader getCharacterStream(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Reader getCharacterStream(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Clob getClob(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Clob getClob(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int getConcurrency() {
+ return ResultSet.CONCUR_READ_ONLY;
+ }
+
+ @Override
+ public String getCursorName() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Date getDate(int columnIndex) throws SQLException {
+ return new Date(getLong(columnIndex));
+ }
+
+ @Override
+ public Date getDate(String columnName) throws SQLException {
+ return getDate(findColumn(columnName));
+ }
+
+ @Override
+ public Date getDate(int arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
}
@Override
- protected void constructOneRow() {
- ioTDBRpcDataSet.constructOneRow();
+ public Date getDate(String arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
}
@Override
- protected void checkRecord() throws SQLException {
+ public double getDouble(int columnIndex) throws SQLException {
try {
- ioTDBRpcDataSet.checkRecord();
+ return getDouble(ioTDBRpcDataSet.findColumnNameByIndex(columnIndex));
} catch (StatementExecutionException e) {
throw new SQLException(e.getMessage());
}
}
@Override
- protected String getValueByName(String columnName) throws SQLException {
+ public double getDouble(String columnName) throws SQLException {
try {
- return ioTDBRpcDataSet.getValueByName(columnName);
+ return ioTDBRpcDataSet.getDouble(columnName);
} catch (StatementExecutionException e) {
throw new SQLException(e.getMessage());
}
}
@Override
- protected Object getObjectByName(String columnName) throws SQLException {
+ public int getFetchDirection() {
+ return ResultSet.FETCH_FORWARD;
+ }
+
+ @Override
+ public void setFetchDirection(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void setFetchSize(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public float getFloat(int columnIndex) throws SQLException {
try {
- return ioTDBRpcDataSet.getObjectByName(columnName);
+ return ioTDBRpcDataSet.getFloat(columnIndex);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public float getFloat(String columnName) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getFloat(columnName);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getInt(columnIndex);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int getInt(String columnName) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getInt(columnName);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getLong(columnIndex);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public long getLong(String columnName) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getLong(columnName);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() {
+ String operationType = "";
+ boolean nonAlign = false;
+ try {
+ if (statement.getResultSet() != null) {
+ operationType = ((IoTDBJDBCResultSet) statement.getResultSet()).getOperationType();
+ this.sgColumns = ((IoTDBJDBCResultSet) statement.getResultSet()).getSgColumns();
+ }
+ } catch (SQLException throwables) {
+ throwables.printStackTrace();
+ }
+ return new IoTDBResultMetadata(
+ nonAlign,
+ sgColumns,
+ operationType,
+ ioTDBRpcDataSet.columnNameList,
+ ioTDBRpcDataSet.columnTypeList,
+ ioTDBRpcDataSet.ignoreTimeStamp);
+ }
+
+ @Override
+ public Reader getNCharacterStream(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Reader getNCharacterStream(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public NClob getNClob(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public NClob getNClob(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public String getNString(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public String getNString(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Object getObject(int columnIndex) throws SQLException {
+ try {
+ return getObject(ioTDBRpcDataSet.findColumnNameByIndex(columnIndex));
} catch (StatementExecutionException e) {
throw new SQLException(e.getMessage());
}
}
+ @Override
+ public Object getObject(String columnName) throws SQLException {
+ return getObjectByName(columnName);
+ }
+
+ @Override
+ public Object getObject(int arg0, Map<String, Class<?>> arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Object getObject(String arg0, Map<String, Class<?>> arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public <T> T getObject(int arg0, Class<T> arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public <T> T getObject(String arg0, Class<T> arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Ref getRef(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Ref getRef(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int getRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public RowId getRowId(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public RowId getRowId(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public SQLXML getSQLXML(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public SQLXML getSQLXML(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public short getShort(int columnIndex) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public short getShort(String columnName) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Statement getStatement() {
+ return this.statement;
+ }
+
+ @Override
+ public String getString(int columnIndex) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getString(columnIndex);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String getString(String columnName) throws SQLException {
+ return getValueByName(columnName);
+ }
+
+ @Override
+ public Time getTime(int columnIndex) throws SQLException {
+ return new Time(getLong(columnIndex));
+ }
+
+ @Override
+ public Time getTime(String columnName) throws SQLException {
+ return getTime(findColumn(columnName));
+ }
+
+ @Override
+ public Time getTime(int arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Time getTime(String arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ return new Timestamp(getLong(columnIndex));
+ }
+
+ @Override
+ public Timestamp getTimestamp(String columnName) throws SQLException {
+ return getTimestamp(findColumn(columnName));
+ }
+
+ @Override
+ public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public int getType() {
+ return ResultSet.TYPE_FORWARD_ONLY;
+ }
+
+ @Override
+ public URL getURL(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public URL getURL(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public InputStream getUnicodeStream(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public InputStream getUnicodeStream(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public SQLWarning getWarnings() {
+ return warningChain;
+ }
+
+ @Override
+ public void insertRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isAfterLast() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isBeforeFirst() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return ioTDBRpcDataSet.isClosed;
+ }
+
+ @Override
+ public boolean isFirst() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isLast() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean last() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void moveToCurrentRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void moveToInsertRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ try {
+ return ioTDBRpcDataSet.next();
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ private boolean fetchResults() throws SQLException {
+ try {
+ return ioTDBRpcDataSet.fetchResults();
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean previous() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void refreshRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean relative(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean rowDeleted() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean rowInserted() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean rowUpdated() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateArray(int arg0, Array arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateArray(String arg0, Array arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(int arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(String arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(int arg0, InputStream arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(String arg0, InputStream arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(int arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateAsciiStream(String arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBigDecimal(String arg0, BigDecimal arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(int arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(String arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(int arg0, InputStream arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(String arg0, InputStream arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(int arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBinaryStream(String arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(int arg0, Blob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(String arg0, Blob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(int arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(String arg0, InputStream arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(int arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBlob(String arg0, InputStream arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBoolean(int arg0, boolean arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBoolean(String arg0, boolean arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateByte(int arg0, byte arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateByte(String arg0, byte arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBytes(int arg0, byte[] arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateBytes(String arg0, byte[] arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(int arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(String arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(int arg0, Reader arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(String arg0, Reader arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(int arg0, Clob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(String arg0, Clob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(int arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(String arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(int arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateClob(String arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateDate(int arg0, Date arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateDate(String arg0, Date arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateDouble(int arg0, double arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateDouble(String arg0, double arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateFloat(int arg0, float arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateFloat(String arg0, float arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateInt(int arg0, int arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateInt(String arg0, int arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateLong(int arg0, long arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateLong(String arg0, long arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNCharacterStream(int arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNCharacterStream(String arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(int arg0, NClob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(String arg0, NClob arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(int arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(String arg0, Reader arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(int arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNClob(String arg0, Reader arg1, long arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNString(int arg0, String arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNString(String arg0, String arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNull(int arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateNull(String arg0) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateObject(int arg0, Object arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateObject(String arg0, Object arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateObject(int arg0, Object arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateObject(String arg0, Object arg1, int arg2) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateRef(int arg0, Ref arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateRef(String arg0, Ref arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateRow() throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateRowId(int arg0, RowId arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateRowId(String arg0, RowId arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateSQLXML(int arg0, SQLXML arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateSQLXML(String arg0, SQLXML arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateShort(int arg0, short arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateShort(String arg0, short arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateString(int arg0, String arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateString(String arg0, String arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateTime(int arg0, Time arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateTime(String arg0, Time arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateTimestamp(int arg0, Timestamp arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public void updateTimestamp(String arg0, Timestamp arg1) throws SQLException {
+ throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean wasNull() {
+ return ioTDBRpcDataSet.lastReadWasNull;
+ }
+
+ protected String getValueByName(String columnName) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getValueByName(columnName);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ protected Object getObjectByName(String columnName) throws SQLException {
+ try {
+ return ioTDBRpcDataSet.getObjectByName(columnName);
+ } catch (StatementExecutionException e) {
+ throw new SQLException(e.getMessage());
+ }
+ }
+
+ public boolean isSetTracingInfo() {
+ if (ioTDBRpcTracingInfo == null) {
+ return false;
+ }
+ return ioTDBRpcTracingInfo.isSetTracingInfo();
+ }
+
+ public List<String> getActivityList() {
+ return ioTDBRpcTracingInfo.getActivityList();
+ }
+
+ public List<Long> getElapsedTimeList() {
+ return ioTDBRpcTracingInfo.getElapsedTimeList();
+ }
+
+ public long getStatisticsByName(String name) throws Exception {
+ return ioTDBRpcTracingInfo.getStatisticsByName(name);
+ }
+
+ public String getStatisticsInfoByName(String name) throws Exception {
+ return ioTDBRpcTracingInfo.getStatisticsInfoByName(name);
+ }
+
public boolean isIgnoreTimeStamp() {
return ioTDBRpcDataSet.ignoreTimeStamp;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index cef005d529..3fb925e46b 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -258,7 +258,7 @@ public class IoTDBStatement implements Statement {
}
execReq.setFetchSize(rows);
execReq.setTimeout((long) queryTimeout * 1000);
- TSExecuteStatementResp execResp = client.executeStatement(execReq);
+ TSExecuteStatementResp execResp = client.executeStatementV2(execReq);
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (StatementExecutionException e) {
@@ -268,7 +268,7 @@ public class IoTDBStatement implements Statement {
deepCopyResp(execResp);
if (execResp.isSetColumns()) {
queryId = execResp.getQueryId();
- if (execResp.queryDataSet == null) {
+ if (execResp.queryResult == null) {
BitSet aliasColumn = listToBitSet(execResp.getAliasColumns());
this.resultSet =
new IoTDBNonAlignJDBCResultSet(
@@ -299,7 +299,7 @@ public class IoTDBStatement implements Statement {
sql,
queryId,
sessionId,
- execResp.queryDataSet,
+ execResp.queryResult,
execResp.tracingInfo,
execReq.timeout,
true);
@@ -406,7 +406,7 @@ public class IoTDBStatement implements Statement {
execReq.setFetchSize(rows);
execReq.setTimeout(timeoutInMS);
execReq.setJdbcQuery(true);
- TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
+ TSExecuteStatementResp execResp = client.executeQueryStatementV2(execReq);
queryId = execResp.getQueryId();
try {
RpcUtils.verifySuccess(execResp.getStatus());
@@ -421,7 +421,7 @@ public class IoTDBStatement implements Statement {
if (execResp.getAliasColumns() != null && execResp.getAliasColumns().size() > 0) {
aliasColumn = listToBitSet(execResp.getAliasColumns());
}
- if (execResp.queryDataSet == null) {
+ if (execResp.queryResult == null) {
this.resultSet =
new IoTDBNonAlignJDBCResultSet(
this,
@@ -451,7 +451,7 @@ public class IoTDBStatement implements Statement {
sql,
queryId,
sessionId,
- execResp.queryDataSet,
+ execResp.queryResult,
execResp.tracingInfo,
execReq.timeout,
execResp.operationType,
@@ -476,9 +476,21 @@ public class IoTDBStatement implements Statement {
if (Objects.nonNull(tsQueryDataSet)) {
deepCopyTsQueryDataSet(tsQueryDataSet);
- } else {
+ } else if (Objects.nonNull(nonAlignDataSet)) {
deepCopyNonAlignQueryDataSet(nonAlignDataSet);
+ } else {
+ deepCopyQueryResult(queryRes);
+ }
+ }
+
+ private void deepCopyQueryResult(TSExecuteStatementResp queryRes) {
+ List<ByteBuffer> queryResult = queryRes.getQueryResult();
+ if (queryResult == null) {
+ return;
}
+ final List<ByteBuffer> queryResultCopy =
+ queryResult.stream().map(ReadWriteIOUtils::clone).collect(Collectors.toList());
+ queryRes.setQueryResult(queryResultCopy);
}
private void deepCopyNonAlignQueryDataSet(TSQueryNonAlignDataSet nonAlignDataSet) {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
index 84de9a2524..029439e851 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
@@ -70,7 +70,7 @@ public class IoTDBDatabaseMetadataTest {
when(connection.createStatement())
.thenReturn(new IoTDBStatement(connection, client, sessionId, zoneID, 0, 1L));
databaseMetaData = new IoTDBDatabaseMetadata(connection, client, sessionId);
- when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execStatementResp);
+ when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(execStatementResp);
when(client.getProperties()).thenReturn(properties);
when(execStatementResp.getStatus()).thenReturn(successStatus);
when(execStatementResp.getQueryId()).thenReturn(queryId);
@@ -129,7 +129,7 @@ public class IoTDBDatabaseMetadataTest {
columnsList.add("storage group");
Map<String, Integer> columnNameIndexMap = new HashMap<String, Integer>();
columnNameIndexMap.put("storage group", 0);
- when(client.executeQueryStatement(any(TSExecuteStatementReq.class)))
+ when(client.executeQueryStatementV2(any(TSExecuteStatementReq.class)))
.thenReturn(execStatementResp);
when(execStatementResp.getStatus()).thenReturn(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
when(execStatementResp.getQueryId()).thenReturn(queryId);
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
index 4d9b6f2b51..defd180750 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.junit.Assert;
import org.junit.Before;
@@ -48,6 +50,7 @@ import java.sql.Types;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -123,17 +126,17 @@ public class IoTDBJDBCResultSetTest {
statement = new IoTDBStatement(connection, client, sessionId, zoneID);
- execResp.queryDataSet = FakedFirstFetchResult();
+ execResp.queryResult = FakedFirstFetchTsBlockResult();
when(connection.isClosed()).thenReturn(false);
- when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execResp);
+ when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(execResp);
when(execResp.getQueryId()).thenReturn(queryId);
when(execResp.getStatus()).thenReturn(successStatus);
when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
when(fetchMetadataResp.getStatus()).thenReturn(successStatus);
- when(client.fetchResults(any(TSFetchResultsReq.class))).thenReturn(fetchResultsResp);
+ when(client.fetchResultsV2(any(TSFetchResultsReq.class))).thenReturn(fetchResultsResp);
when(fetchResultsResp.getStatus()).thenReturn(successStatus);
TSStatus closeResp = successStatus;
@@ -402,4 +405,72 @@ public class IoTDBJDBCResultSetTest {
standardObject.addAll(Arrays.asList(row));
}
}
+
+ private List<ByteBuffer> FakedFirstFetchTsBlockResult() {
+ List<TSDataType> tsDataTypeList = new ArrayList<>();
+ tsDataTypeList.add(TSDataType.FLOAT); // root.vehicle.d0.s2
+ tsDataTypeList.add(TSDataType.INT64); // root.vehicle.d0.s1
+ tsDataTypeList.add(TSDataType.INT32); // root.vehicle.d0.s0
+
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(tsDataTypeList);
+
+ Object[][] input = {
+ {
+ 2L, 2.22F, 40000L, null,
+ },
+ {
+ 3L, 3.33F, null, null,
+ },
+ {
+ 4L, 4.44F, null, null,
+ },
+ {
+ 50L, null, 50000L, null,
+ },
+ {
+ 100L, null, 199L, null,
+ },
+ {
+ 101L, null, 199L, null,
+ },
+ {
+ 103L, null, 199L, null,
+ },
+ {
+ 105L, 11.11F, 199L, 33333,
+ },
+ {
+ 1000L, 1000.11F, 55555L, 22222,
+ }
+ };
+ for (int row = 0; row < input.length; row++) {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong((long) input[row][0]);
+ if (input[row][1] != null) {
+ tsBlockBuilder.getColumnBuilder(0).writeFloat((float) input[row][1]);
+ } else {
+ tsBlockBuilder.getColumnBuilder(0).appendNull();
+ }
+ if (input[row][2] != null) {
+ tsBlockBuilder.getColumnBuilder(1).writeLong((long) input[row][2]);
+ } else {
+ tsBlockBuilder.getColumnBuilder(1).appendNull();
+ }
+ if (input[row][3] != null) {
+ tsBlockBuilder.getColumnBuilder(2).writeInt((int) input[row][3]);
+ } else {
+ tsBlockBuilder.getColumnBuilder(2).appendNull();
+ }
+
+ tsBlockBuilder.declarePosition();
+ }
+
+ ByteBuffer tsBlock = null;
+ try {
+ tsBlock = new TsBlockSerde().serialize(tsBlockBuilder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return Collections.singletonList(tsBlock);
+ }
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
index 0d2efc4566..2955fcc9cb 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
@@ -58,7 +58,7 @@ public class IoTDBPreparedStatementTest {
when(execStatementResp.getStatus()).thenReturn(Status_SUCCESS);
when(execStatementResp.getQueryId()).thenReturn(queryId);
- when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execStatementResp);
+ when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(execStatementResp);
}
@SuppressWarnings("resource")
@@ -72,7 +72,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 24 and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -110,7 +110,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 123 and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -127,7 +127,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 123 and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -144,7 +144,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 123.133 and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -161,7 +161,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 123.456 and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -178,7 +178,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < false and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -195,7 +195,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < 'abcde' and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -212,7 +212,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE temperature < \"abcde\" and time > 2017-11-1 0:13:00",
argument.getValue().getStatement());
@@ -228,7 +228,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01", argument.getValue().getStatement());
}
@@ -243,7 +243,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE time > 1233",
argument.getValue().getStatement());
@@ -259,7 +259,7 @@ public class IoTDBPreparedStatementTest {
ps.execute();
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE time > 2017-11-01T00:13:00",
argument.getValue().getStatement());
@@ -277,7 +277,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE status = '134' and temperature = 1333",
argument.getValue().getStatement());
@@ -296,7 +296,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"SELECT status, temperature FROM root.ln.wf01.wt01 WHERE status = '\\044e' || temperature = -1323.0",
argument.getValue().getStatement());
@@ -320,7 +320,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"INSERT INTO root.ln.wf01.wt01(time,a,b,c,d,e,f) VALUES(12324,false,123,123234345,123.423,-1323.0,'abc')",
argument.getValue().getStatement());
@@ -344,7 +344,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"INSERT INTO root.ln.wf01.wt01(time,a,b,c,d,e,f) VALUES(2017-11-01T00:13:00,false,123,123234345,123.423,-1323.0,\"abc\")",
argument.getValue().getStatement());
@@ -367,7 +367,7 @@ public class IoTDBPreparedStatementTest {
ArgumentCaptor<TSExecuteStatementReq> argument =
ArgumentCaptor.forClass(TSExecuteStatementReq.class);
- verify(client).executeStatement(argument.capture());
+ verify(client).executeStatementV2(argument.capture());
assertEquals(
"INSERT INTO root.ln.wf01.wt02(time,a,b,c,d,e,f) VALUES(2020-01-01T10:10:10,false,123,123234345,123.423,-1323.0,\"abc\")",
argument.getValue().getStatement());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index f2538c8db7..5a94184e67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -18,11 +18,14 @@
*/
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import java.nio.ByteBuffer;
+
public interface ISourceHandle {
/** Get the local fragment instance ID that this source handle belongs to. */
@@ -40,6 +43,13 @@ public interface ISourceHandle {
*/
TsBlock receive();
+ /**
+ * Get the serialized {@link TsBlock} as the form of bytebuffer. This method share the same
+ * iterator with receive(). When one of these two methods is called, the cursor in iterator will
+ * forward.
+ */
+ ByteBuffer getSerializedTsBlock() throws IoTDBException;
+
/** If there are more tsblocks. */
boolean isFinished();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 6c17791efd..c847af598b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -19,16 +19,21 @@
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -49,6 +54,8 @@ public class LocalSourceHandle implements ISourceHandle {
private final String threadName;
+ private static final TsBlockSerde serde = new TsBlockSerde();
+
public LocalSourceHandle(
TFragmentInstanceId remoteFragmentInstanceId,
TFragmentInstanceId localFragmentInstanceId,
@@ -103,6 +110,20 @@ public class LocalSourceHandle implements ISourceHandle {
}
}
+ @Override
+ public ByteBuffer getSerializedTsBlock() throws IoTDBException {
+ TsBlock tsBlock = receive();
+ if (tsBlock != null) {
+ try {
+ return serde.serialize(tsBlock);
+ } catch (Exception e) {
+ throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+ }
+ } else {
+ return null;
+ }
+ }
+
@Override
public boolean isFinished() {
synchronized (queue) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index bfd9a017ea..d788aae6e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -66,8 +66,8 @@ public class SourceHandle implements ISourceHandle {
private final TsBlockSerde serde;
private final SourceHandleListener sourceHandleListener;
- private final Map<Integer, TsBlock> sequenceIdToTsBlock = new HashMap<>();
private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+ private final Map<Integer, ByteBuffer> sequenceIdToTsBlock = new HashMap<>();
private final String threadName;
private long retryIntervalInMs;
@@ -116,6 +116,16 @@ public class SourceHandle implements ISourceHandle {
@Override
public synchronized TsBlock receive() {
+ ByteBuffer tsBlock = getSerializedTsBlock();
+ if (tsBlock != null) {
+ return serde.deserialize(tsBlock);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized ByteBuffer getSerializedTsBlock() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
@@ -124,7 +134,7 @@ public class SourceHandle implements ISourceHandle {
throw new IllegalStateException("Source handle is blocked.");
}
- TsBlock tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
+ ByteBuffer tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
if (tsBlock == null) {
return null;
}
@@ -385,11 +395,9 @@ public class SourceHandle implements ISourceHandle {
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
TGetDataBlockResponse resp = client.getDataBlock(req);
- List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
- for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
- TsBlock tsBlock = serde.deserialize(byteBuffer);
- tsBlocks.add(tsBlock);
- }
+ List<ByteBuffer> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
+ tsBlocks.addAll(resp.getTsBlocks());
+
logger.info("[EndPullTsBlocksFromRemote] Count:{}", tsBlocks.size());
executorService.submit(
new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index ba0380d015..b89bc3c667 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import java.nio.ByteBuffer;
import java.util.Optional;
public interface IQueryExecution {
@@ -37,6 +38,8 @@ public interface IQueryExecution {
Optional<TsBlock> getBatchResult() throws IoTDBException;
+ Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException;
+
boolean hasNextResult();
int getOutputValueColumnCount();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 697e265726..edbff46b71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -68,6 +68,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -162,6 +163,11 @@ public class QueryExecution implements IQueryExecution {
});
}
+ @FunctionalInterface
+ interface ISourceHandleSupplier<T> {
+ T get() throws IoTDBException;
+ }
+
public void start() {
if (skipExecute()) {
logger.info("[SkipExecute]");
@@ -341,8 +347,7 @@ public class QueryExecution implements IQueryExecution {
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
- @Override
- public Optional<TsBlock> getBatchResult() throws IoTDBException {
+ private <T> Optional<T> getResult(ISourceHandleSupplier<T> dataSupplier) throws IoTDBException {
checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly.");
// iterate until we get a non-nullable TsBlock or result is finished
while (true) {
@@ -358,9 +363,6 @@ public class QueryExecution implements IQueryExecution {
stateMachine.getFailureMessage(), TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
}
} else if (resultHandle.isFinished()) {
- // Once the resultHandle is finished, we should transit the state of this query to
- // FINISHED.
- // So that the corresponding cleanup work could be triggered.
logger.info("[ResultHandleFinished]");
stateMachine.transitionToFinished();
return Optional.empty();
@@ -369,7 +371,8 @@ public class QueryExecution implements IQueryExecution {
ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
if (!resultHandle.isFinished()) {
- TsBlock res = resultHandle.receive();
+ // use the getSerializedTsBlock instead of receive to get ByteBuffer result
+ T res = dataSupplier.get();
if (res == null) {
continue;
}
@@ -397,6 +400,24 @@ public class QueryExecution implements IQueryExecution {
}
}
+ @Override
+ public Optional<TsBlock> getBatchResult() throws IoTDBException {
+ return getResult(this::getDeserializedTsBlock);
+ }
+
+ private TsBlock getDeserializedTsBlock() {
+ return resultHandle.receive();
+ }
+
+ @Override
+ public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+ return getResult(this::getSerializedTsBlock);
+ }
+
+ private ByteBuffer getSerializedTsBlock() throws IoTDBException {
+ return resultHandle.getSerializedTsBlock();
+ }
+
/** @return true if there is more tsblocks, otherwise false */
@Override
public boolean hasNextResult() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 08657095ec..7e379425fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -46,6 +47,8 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -67,6 +70,8 @@ public class ConfigExecution implements IQueryExecution {
private final IConfigTask task;
private IConfigTaskExecutor configTaskExecutor;
+ private static final TsBlockSerde serde = new TsBlockSerde();
+
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
this.executor = executor;
@@ -175,6 +180,19 @@ public class ConfigExecution implements IQueryExecution {
return Optional.empty();
}
+ @Override
+ public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+ if (!resultSetConsumed) {
+ resultSetConsumed = true;
+ try {
+ return Optional.of(serde.serialize(resultSet));
+ } catch (IOException e) {
+ throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+ }
+ }
+ return Optional.empty();
+ }
+
// According to the execution process of ConfigExecution, there is only one TsBlock for
// this execution. Thus, the hasNextResult will be false once the TsBlock is consumed
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index 2cf224962b..bb116f7ef7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -19,13 +19,19 @@
package org.apache.iotdb.db.mpp.plan.execution.memory;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import static com.google.common.util.concurrent.Futures.immediateFuture;
public class MemorySourceHandle implements ISourceHandle {
@@ -33,6 +39,8 @@ public class MemorySourceHandle implements ISourceHandle {
private final TsBlock result;
private boolean hasNext;
+ private static final TsBlockSerde serde = new TsBlockSerde();
+
public MemorySourceHandle(TsBlock result) {
Validate.notNull(result, "the TsBlock should not be null when constructing MemorySourceHandle");
this.result = result;
@@ -60,6 +68,20 @@ public class MemorySourceHandle implements ISourceHandle {
return result;
}
+ @Override
+ public synchronized ByteBuffer getSerializedTsBlock() throws IoTDBException {
+ hasNext = false;
+ if (result.isEmpty()) {
+ return null;
+ } else {
+ try {
+ return serde.serialize(result);
+ } catch (IOException e) {
+ throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+ }
+ }
+ }
+
@Override
public synchronized boolean isFinished() {
return !hasNext;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 96ca1fdcf0..6c0873f5f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -116,6 +116,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -148,6 +149,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private final ISchemaFetcher SCHEMA_FETCHER;
+ @FunctionalInterface
+ public interface SelectResult {
+ public void apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
+ throws IoTDBException, IOException;
+ }
+
+ private static final SelectResult SELECT_RESULT =
+ (resp, queryExecution, fetchSize) ->
+ resp.setQueryResult(
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize));
+
+ private static final SelectResult OLD_SELECT_RESULT =
+ (resp, queryExecution, fetchSize) ->
+ resp.setQueryDataSet(
+ QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize));
+
public ClientRPCServiceImpl() {
if (config.isClusterMode()) {
PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
@@ -158,6 +175,246 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
}
+ private TSExecuteStatementResp executeStatementInternal(
+ TSExecuteStatementReq req, SelectResult setResult) {
+ String statement = req.getStatement();
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(
+ statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ if (s == null) {
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
+ }
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
+
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ statement,
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ setResult.apply(resp, queryExecution, req.fetchSize);
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+ }
+ }
+ }
+
+ private TSExecuteStatementResp executeRawDataQueryInternal(
+ TSRawDataQueryReq req, SelectResult setResult) {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ setResult.apply(resp, queryExecution, req.fetchSize);
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ }
+ }
+ }
+
+ private TSExecuteStatementResp executeLastDataQueryInternal(
+ TSLastDataQueryReq req, SelectResult setResult) {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+ long startTime = System.currentTimeMillis();
+ try {
+ Statement s =
+ StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return RpcUtils.getTSExecuteStatementResp(status);
+ }
+ QUERY_FREQUENCY_RECORDER.incrementAndGet();
+ AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
+ long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+ // create and cache dataset
+ ExecutionResult result =
+ COORDINATOR.execute(
+ s,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ req.getTimeout());
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException("error code: " + result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ setResult.apply(resp, queryExecution, req.fetchSize);
+ } else {
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+
+ } catch (Exception e) {
+ // TODO call the coordinator to release query resource
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+ } finally {
+ addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+ long costTime = System.currentTimeMillis() - startTime;
+ if (costTime >= CONFIG.getSlowQueryThreshold()) {
+ SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+ }
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) {
+ return executeStatementV2(req);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req) {
+ return executeStatementV2(req);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) {
+ return executeStatementInternal(req, SELECT_RESULT);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) {
+ return executeRawDataQueryInternal(req, SELECT_RESULT);
+ }
+
+ @Override
+ public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) {
+ return executeLastDataQueryInternal(req, SELECT_RESULT);
+ }
+
+ @Override
+ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
+ }
+ TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
+ try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
+ List<ByteBuffer> result =
+ QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize);
+ boolean hasResultSet = !(result.size() == 0);
+ resp.setHasResultSet(hasResultSet);
+ resp.setIsAlign(true);
+ resp.setQueryResult(result);
+ QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
+ if (!hasResultSet) {
+ COORDINATOR.removeQueryExecution(req.queryId);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ }
+ }
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
@@ -570,74 +827,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
- String statement = req.getStatement();
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
- }
-
- long startTime = System.currentTimeMillis();
- try {
- Statement s =
- StatementGenerator.createStatement(
- statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
-
- if (s == null) {
- return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(
- TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
- }
- // permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return RpcUtils.getTSExecuteStatementResp(status);
- }
-
- QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
-
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
- // create and cache dataset
- ExecutionResult result =
- COORDINATOR.execute(
- s,
- queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- statement,
- PARTITION_FETCHER,
- SCHEMA_FETCHER,
- req.getTimeout());
-
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
- return RpcUtils.getTSExecuteStatementResp(result.status);
- }
-
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
- try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- TSExecuteStatementResp resp;
- if (queryExecution != null && queryExecution.isQuery()) {
- resp = createResponse(queryExecution.getDatasetHeader(), queryId);
- resp.setStatus(result.status);
- resp.setQueryDataSet(
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
- } else {
- resp = RpcUtils.getTSExecuteStatementResp(result.status);
- }
-
- return resp;
- }
- } catch (Exception e) {
- // TODO call the coordinator to release query resource
- return RpcUtils.getTSExecuteStatementResp(
- onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
- } finally {
- addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
- }
- }
+ return executeStatementInternal(req, OLD_SELECT_RESULT);
}
@Override
@@ -699,13 +889,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
@Override
- public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
+ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
return executeStatement(req);
}
@Override
- public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
- throws TException {
+ public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
return executeStatement(req);
}
@@ -720,7 +909,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
-
TSQueryDataSet result =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
boolean hasResultSet = result.bufferForTime().limit() != 0;
@@ -728,7 +916,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
-
QUERY_TIME_MANAGER.unRegisterQuery(req.queryId, false);
if (!hasResultSet) {
COORDINATOR.removeQueryExecution(req.queryId);
@@ -1180,125 +1367,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
- }
- long startTime = System.currentTimeMillis();
- try {
- Statement s =
- StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
-
- // permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return RpcUtils.getTSExecuteStatementResp(status);
- }
-
- QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
- // create and cache dataset
- ExecutionResult result =
- COORDINATOR.execute(
- s,
- queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "",
- PARTITION_FETCHER,
- SCHEMA_FETCHER,
- req.getTimeout());
-
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException("error code: " + result.status);
- }
-
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
- try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- TSExecuteStatementResp resp;
- if (queryExecution.isQuery()) {
- resp = createResponse(queryExecution.getDatasetHeader(), queryId);
- resp.setStatus(result.status);
- resp.setQueryDataSet(
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
- } else {
- resp = RpcUtils.getTSExecuteStatementResp(result.status);
- }
- return resp;
- }
- } catch (Exception e) {
- // TODO call the coordinator to release query resource
- return RpcUtils.getTSExecuteStatementResp(
- onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
- } finally {
- addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
- }
- }
+ return executeRawDataQueryInternal(req, OLD_SELECT_RESULT);
}
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
- return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
- }
- long startTime = System.currentTimeMillis();
- try {
- Statement s =
- StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
-
- // permission check
- TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return RpcUtils.getTSExecuteStatementResp(status);
- }
-
- QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
- long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
- // create and cache dataset
- ExecutionResult result =
- COORDINATOR.execute(
- s,
- queryId,
- SESSION_MANAGER.getSessionInfo(req.sessionId),
- "",
- PARTITION_FETCHER,
- SCHEMA_FETCHER,
- req.getTimeout());
-
- if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException("error code: " + result.status);
- }
-
- IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
- try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- TSExecuteStatementResp resp;
- if (queryExecution.isQuery()) {
- resp = createResponse(queryExecution.getDatasetHeader(), queryId);
- resp.setStatus(result.status);
- resp.setQueryDataSet(
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize));
- } else {
- resp = RpcUtils.getTSExecuteStatementResp(result.status);
- }
- return resp;
- }
-
- } catch (Exception e) {
- // TODO call the coordinator to release query resource
- return RpcUtils.getTSExecuteStatementResp(
- onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
- } finally {
- addOperationLatency(Operation.EXECUTE_QUERY, startTime);
- long costTime = System.currentTimeMillis() - startTime;
- if (costTime >= CONFIG.getSlowQueryThreshold()) {
- SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
- }
- }
+ return executeLastDataQueryInternal(req, OLD_SELECT_RESULT);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index aca7664144..48c298f937 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -315,6 +315,38 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
serviceProvider = IoTDB.serviceProvider;
}
+ @Override
+ public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req)
+ throws TException {
+ return null;
+ }
+
+ @Override
+ public TSExecuteStatementResp executeUpdateStatementV2(TSExecuteStatementReq req)
+ throws TException {
+ return null;
+ }
+
+ @Override
+ public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) throws TException {
+ return null;
+ }
+
+ @Override
+ public TSExecuteStatementResp executeRawDataQueryV2(TSRawDataQueryReq req) throws TException {
+ return null;
+ }
+
+ @Override
+ public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) throws TException {
+ return null;
+ }
+
+ @Override
+ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) throws TException {
+ return null;
+ }
+
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 8b4ff934c3..ca603bf535 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -38,6 +38,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -373,6 +374,32 @@ public class QueryDataSetUtils {
return tsQueryDataSet;
}
+ // To fetch required amounts of data and combine them through List
+ public static List<ByteBuffer> convertQueryResultByFetchSize(
+ IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
+ int rowCount = 0;
+ List<ByteBuffer> res = new ArrayList<>();
+ while (rowCount < fetchSize) {
+ Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+ if (!optionalByteBuffer.isPresent()) {
+ break;
+ }
+ ByteBuffer byteBuffer = optionalByteBuffer.get();
+ byteBuffer.mark();
+ int valueColumnCount = byteBuffer.getInt();
+ for (int i = 0; i < valueColumnCount; i++) {
+ byteBuffer.get();
+ }
+ int positionCount = byteBuffer.getInt();
+ byteBuffer.reset();
+ if (positionCount != 0) {
+ res.add(byteBuffer);
+ }
+ rowCount += positionCount;
+ }
+ return res;
+ }
+
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
long[] times = new long[size];
for (int i = 0; i < size; i++) {
diff --git a/service-rpc/pom.xml b/service-rpc/pom.xml
index 792a0b6154..5810db7f28 100644
--- a/service-rpc/pom.xml
+++ b/service-rpc/pom.xml
@@ -62,6 +62,14 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.openjdk.jol</groupId>
+ <artifactId>jol-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>slice</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 8d9d22201f..3077c571be 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -24,27 +24,24 @@ import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.thrift.TException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
public class IoTDBRpcDataSet {
public static final String TIMESTAMP_STR = "Time";
- public static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
public static final int START_INDEX = 2;
public String sql;
public boolean isClosed = false;
@@ -60,7 +57,6 @@ public class IoTDBRpcDataSet {
public boolean hasCachedRecord = false;
public boolean lastReadWasNull;
- public byte[][] values; // used to cache the current row record value
// column size
public int columnSize;
@@ -68,14 +64,15 @@ public class IoTDBRpcDataSet {
public long queryId;
public long statementId;
public boolean ignoreTimeStamp;
+ public boolean isRpcFetchResult;
- public int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
-
- public TSQueryDataSet tsQueryDataSet = null;
- public byte[] time; // used to cache the current time value
- public byte[] currentBitmap; // used to cache the current bitmap for every column
- public static final int FLAG =
- 0x80; // used to do `and` operation with bitmap to judge whether the value is null
+ public static final TsBlockSerde serde = new TsBlockSerde();
+ public List<ByteBuffer> queryResult;
+ public TsBlock curTsBlock;
+ public int queryResultSize; // the length of queryResult
+ public int queryResultIndex; // the index of bytebuffer in queryResult
+ public int tsBlockSize; // the size of current tsBlock
+ public int tsBlockIndex; // the row index in current tsBlock
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public IoTDBRpcDataSet(
@@ -84,11 +81,12 @@ public class IoTDBRpcDataSet {
List<String> columnTypeList,
Map<String, Integer> columnNameIndex,
boolean ignoreTimeStamp,
+ boolean isRpcFetchResult,
long queryId,
long statementId,
IClientRPCService.Iface client,
long sessionId,
- TSQueryDataSet queryDataSet,
+ List<ByteBuffer> queryResult,
int fetchSize,
long timeout) {
this.sessionId = sessionId;
@@ -99,6 +97,7 @@ public class IoTDBRpcDataSet {
this.client = client;
this.fetchSize = fetchSize;
this.timeout = timeout;
+ this.isRpcFetchResult = isRpcFetchResult;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -146,37 +145,107 @@ public class IoTDBRpcDataSet {
}
}
- time = new byte[Long.BYTES];
- currentBitmap = new byte[columnTypeDeduplicatedList.size()];
- values = new byte[columnTypeDeduplicatedList.size()][];
- for (int i = 0; i < values.length; i++) {
- TSDataType dataType = columnTypeDeduplicatedList.get(i);
- switch (dataType) {
- case BOOLEAN:
- values[i] = new byte[1];
- break;
- case INT32:
- values[i] = new byte[Integer.BYTES];
- break;
- case INT64:
- values[i] = new byte[Long.BYTES];
- break;
- case FLOAT:
- values[i] = new byte[Float.BYTES];
- break;
- case DOUBLE:
- values[i] = new byte[Double.BYTES];
- break;
- case TEXT:
- values[i] = null;
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+ this.queryResult = queryResult;
+ this.queryResultSize = 0;
+ if (queryResult != null) {
+ queryResultSize = queryResult.size();
+ }
+ this.queryResultIndex = 0;
+ this.tsBlockSize = 0;
+ this.tsBlockIndex = -1;
+ this.emptyResultSet = this.queryResultSize == 0;
+ }
+
+ public IoTDBRpcDataSet(
+ String sql,
+ List<String> columnNameList,
+ List<String> columnTypeList,
+ Map<String, Integer> columnNameIndex,
+ boolean ignoreTimeStamp,
+ boolean isRpcFetchResult,
+ long queryId,
+ long statementId,
+ IClientRPCService.Iface client,
+ long sessionId,
+ List<ByteBuffer> queryResult,
+ int fetchSize,
+ long timeout,
+ List<String> sgList,
+ BitSet aliasColumnMap) {
+ this.sessionId = sessionId;
+ this.statementId = statementId;
+ this.ignoreTimeStamp = ignoreTimeStamp;
+ this.sql = sql;
+ this.queryId = queryId;
+ this.client = client;
+ this.fetchSize = fetchSize;
+ this.timeout = timeout;
+ this.isRpcFetchResult = isRpcFetchResult;
+ columnSize = columnNameList.size();
+
+ this.columnNameList = new ArrayList<>();
+ this.columnTypeList = new ArrayList<>();
+ if (!ignoreTimeStamp) {
+ this.columnNameList.add(TIMESTAMP_STR);
+ this.columnTypeList.add(String.valueOf(TSDataType.INT64));
+ }
+ // deduplicate and map
+ this.columnOrdinalMap = new HashMap<>();
+ if (!ignoreTimeStamp) {
+ this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
+ }
+
+ // deduplicate and map
+ if (columnNameIndex != null) {
+ int deduplicatedColumnSize = (int) columnNameIndex.values().stream().distinct().count();
+ this.columnTypeDeduplicatedList = new ArrayList<>(deduplicatedColumnSize);
+ for (int i = 0; i < deduplicatedColumnSize; i++) {
+ columnTypeDeduplicatedList.add(null);
+ }
+ for (int i = 0; i < columnNameList.size(); i++) {
+ String name;
+ if (sgList != null
+ && sgList.size() > 0
+ && (aliasColumnMap == null || !aliasColumnMap.get(i))) {
+ name = sgList.get(i) + "." + columnNameList.get(i);
+ } else {
+ name = columnNameList.get(i);
+ }
+
+ this.columnNameList.add(name);
+ this.columnTypeList.add(columnTypeList.get(i));
+ // "Time".equals(name) -> to allow the Time column appear in value columns
+ if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) {
+ int index = columnNameIndex.get(name);
+ if (!columnOrdinalMap.containsValue(index + START_INDEX)) {
+ columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i)));
+ }
+ columnOrdinalMap.put(name, index + START_INDEX);
+ }
+ }
+ } else {
+ this.columnTypeDeduplicatedList = new ArrayList<>();
+ int index = START_INDEX;
+ for (int i = 0; i < columnNameList.size(); i++) {
+ String name = columnNameList.get(i);
+ this.columnNameList.add(name);
+ this.columnTypeList.add(columnTypeList.get(i));
+ if (!columnOrdinalMap.containsKey(name)) {
+ columnOrdinalMap.put(name, index++);
+ columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
+ }
}
}
- this.tsQueryDataSet = queryDataSet;
- this.emptyResultSet = (queryDataSet == null || !queryDataSet.time.hasRemaining());
+
+ this.queryResult = queryResult;
+ this.queryResultSize = 0;
+ if (queryResult != null) {
+ this.queryResultSize = queryResult.size();
+ }
+ this.queryResultIndex = 0;
+ this.tsBlockSize = 0;
+ this.tsBlockIndex = -1;
+ this.emptyResultSet = this.queryResultSize == 0;
}
public void close() throws StatementExecutionException, TException {
@@ -202,7 +271,12 @@ public class IoTDBRpcDataSet {
}
public boolean next() throws StatementExecutionException, IoTDBConnectionException {
- if (hasCachedResults()) {
+ if (hasCachedBlock()) {
+ constructOneRow();
+ return true;
+ }
+ if (hasCachedByteBuffer()) {
+ constructOneTsBlock();
constructOneRow();
return true;
}
@@ -215,7 +289,8 @@ public class IoTDBRpcDataSet {
"Cannot close dataset, because of network connection: {} ", e);
}
}
- if (fetchResults() && hasCachedResults()) {
+ if (isRpcFetchResult && fetchResults() && hasCachedByteBuffer()) {
+ constructOneTsBlock();
constructOneRow();
return true;
} else {
@@ -230,18 +305,25 @@ public class IoTDBRpcDataSet {
}
public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
- rowsIndex = 0;
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
req.setTimeout(timeout);
try {
- TSFetchResultsResp resp = client.fetchResults(req);
+ TSFetchResultsResp resp = client.fetchResultsV2(req);
RpcUtils.verifySuccess(resp.getStatus());
if (!resp.hasResultSet) {
emptyResultSet = true;
close();
} else {
- tsQueryDataSet = resp.getQueryDataSet();
+ queryResult = resp.getQueryResult();
+ queryResultIndex = 0;
+ queryResultSize = 0;
+ if (queryResult != null) {
+ queryResultSize = queryResult.size();
+ }
+ this.tsBlockSize = 0;
+ this.tsBlockIndex = -1;
+ this.emptyResultSet = this.queryResultSize == 0;
}
return resp.hasResultSet;
} catch (TException e) {
@@ -250,51 +332,35 @@ public class IoTDBRpcDataSet {
}
}
- public boolean hasCachedResults() {
- return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
+ public boolean hasCachedBlock() {
+ return (curTsBlock != null && tsBlockIndex < tsBlockSize - 1);
+ }
+
+ public boolean hasCachedByteBuffer() {
+ return (queryResult != null && queryResultIndex < queryResultSize);
}
public void constructOneRow() {
- lastReadWasNull = false;
- tsQueryDataSet.time.get(time);
- for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
- ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
- // another new 8 row, should move the bitmap buffer position to next byte
- if (rowsIndex % 8 == 0) {
- currentBitmap[i] = bitmapBuffer.get();
- }
- if (!isNull(i, rowsIndex)) {
- ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
- TSDataType dataType = columnTypeDeduplicatedList.get(i);
- switch (dataType) {
- case BOOLEAN:
- case INT32:
- case INT64:
- case FLOAT:
- case DOUBLE:
- valueBuffer.get(values[i]);
- break;
- case TEXT:
- int length = valueBuffer.getInt();
- values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
- }
- }
- }
- rowsIndex++;
+ tsBlockIndex++;
hasCachedRecord = true;
}
+ public void constructOneTsBlock() {
+ lastReadWasNull = false;
+ ByteBuffer byteBuffer = queryResult.get(queryResultIndex);
+ queryResultIndex++;
+ curTsBlock = serde.deserialize(byteBuffer);
+ tsBlockIndex = -1;
+ tsBlockSize = curTsBlock.getPositionCount();
+ }
+
public boolean isNull(int columnIndex) throws StatementExecutionException {
int index = columnOrdinalMap.get(findColumnNameByIndex(columnIndex)) - START_INDEX;
// time column will never be null
if (index < 0) {
return true;
}
- return isNull(index, rowsIndex - 1);
+ return isNull(index, tsBlockIndex);
}
public boolean isNull(String columnName) {
@@ -303,13 +369,11 @@ public class IoTDBRpcDataSet {
if (index < 0) {
return true;
}
- return isNull(index, rowsIndex - 1);
+ return isNull(index, tsBlockIndex);
}
private boolean isNull(int index, int rowNum) {
- byte bitmap = currentBitmap[index];
- int shift = rowNum % 8;
- return ((FLAG >>> shift) & (bitmap & 0xff)) == 0;
+ return curTsBlock.getColumn(index).isNull(rowNum);
}
public boolean getBoolean(int columnIndex) throws StatementExecutionException {
@@ -319,9 +383,9 @@ public class IoTDBRpcDataSet {
public boolean getBoolean(String columnName) throws StatementExecutionException {
checkRecord();
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (!isNull(index, rowsIndex - 1)) {
+ if (!isNull(index, tsBlockIndex)) {
lastReadWasNull = false;
- return BytesUtils.bytesToBool(values[index]);
+ return curTsBlock.getColumn(index).getBoolean(tsBlockIndex);
} else {
lastReadWasNull = true;
return false;
@@ -335,9 +399,9 @@ public class IoTDBRpcDataSet {
public double getDouble(String columnName) throws StatementExecutionException {
checkRecord();
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (!isNull(index, rowsIndex - 1)) {
+ if (!isNull(index, tsBlockIndex)) {
lastReadWasNull = false;
- return BytesUtils.bytesToDouble(values[index]);
+ return curTsBlock.getColumn(index).getDouble(tsBlockIndex);
} else {
lastReadWasNull = true;
return 0;
@@ -351,9 +415,9 @@ public class IoTDBRpcDataSet {
public float getFloat(String columnName) throws StatementExecutionException {
checkRecord();
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (!isNull(index, rowsIndex - 1)) {
+ if (!isNull(index, tsBlockIndex)) {
lastReadWasNull = false;
- return BytesUtils.bytesToFloat(values[index]);
+ return curTsBlock.getColumn(index).getFloat(tsBlockIndex);
} else {
lastReadWasNull = true;
return 0;
@@ -367,9 +431,14 @@ public class IoTDBRpcDataSet {
public int getInt(String columnName) throws StatementExecutionException {
checkRecord();
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (!isNull(index, rowsIndex - 1)) {
+ if (!isNull(index, tsBlockIndex)) {
lastReadWasNull = false;
- return BytesUtils.bytesToInt(values[index]);
+ TSDataType type = curTsBlock.getColumn(index).getDataType();
+ if (type == TSDataType.INT64) {
+ return (int) curTsBlock.getColumn(index).getLong(tsBlockIndex);
+ } else {
+ return curTsBlock.getColumn(index).getInt(tsBlockIndex);
+ }
} else {
lastReadWasNull = true;
return 0;
@@ -383,18 +452,39 @@ public class IoTDBRpcDataSet {
public long getLong(String columnName) throws StatementExecutionException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return BytesUtils.bytesToLong(time);
+ return curTsBlock.getTimeByIndex(tsBlockIndex);
}
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (!isNull(index, rowsIndex - 1)) {
+ if (!isNull(index, tsBlockIndex)) {
lastReadWasNull = false;
- return BytesUtils.bytesToLong(values[index]);
+ TSDataType type = curTsBlock.getColumn(index).getDataType();
+ if (type == TSDataType.INT32) {
+ return curTsBlock.getColumn(index).getInt(tsBlockIndex);
+ } else {
+ return curTsBlock.getColumn(index).getLong(tsBlockIndex);
+ }
} else {
lastReadWasNull = true;
return 0;
}
}
+ public Binary getBinary(int columIndex) throws StatementExecutionException {
+ return getBinary(findColumnNameByIndex(columIndex));
+ }
+
+ public Binary getBinary(String columnName) throws StatementExecutionException {
+ checkRecord();
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (!isNull(index, tsBlockIndex)) {
+ lastReadWasNull = false;
+ return curTsBlock.getColumn(index).getBinary(tsBlockIndex);
+ } else {
+ lastReadWasNull = true;
+ return null;
+ }
+ }
+
public Object getObject(int columnIndex) throws StatementExecutionException {
return getObject(findColumnNameByIndex(columnIndex));
}
@@ -419,6 +509,10 @@ public class IoTDBRpcDataSet {
return getTimestamp(findColumn(columnName));
}
+ public Timestamp getTimestamp() throws StatementExecutionException {
+ return new Timestamp(curTsBlock.getTimeByIndex(tsBlockIndex));
+ }
+
public int findColumn(String columnName) {
return columnOrdinalMap.get(columnName);
}
@@ -426,31 +520,31 @@ public class IoTDBRpcDataSet {
public String getValueByName(String columnName) throws StatementExecutionException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return String.valueOf(BytesUtils.bytesToLong(time));
+ return String.valueOf(curTsBlock.getTimeByIndex(tsBlockIndex));
}
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (index < 0 || index >= values.length || isNull(index, rowsIndex - 1)) {
+ if (index < 0 || index >= columnTypeDeduplicatedList.size() || isNull(index, tsBlockIndex)) {
lastReadWasNull = true;
return null;
}
lastReadWasNull = false;
- return getString(index, columnTypeDeduplicatedList.get(index), values);
+ return getString(index, columnTypeDeduplicatedList.get(index));
}
- public String getString(int index, TSDataType tsDataType, byte[][] values) {
+ public String getString(int index, TSDataType tsDataType) {
switch (tsDataType) {
case BOOLEAN:
- return String.valueOf(BytesUtils.bytesToBool(values[index]));
+ return String.valueOf(curTsBlock.getColumn(index).getBoolean(tsBlockIndex));
case INT32:
- return String.valueOf(BytesUtils.bytesToInt(values[index]));
+ return String.valueOf(curTsBlock.getColumn(index).getInt(tsBlockIndex));
case INT64:
- return String.valueOf(BytesUtils.bytesToLong(values[index]));
+ return String.valueOf(curTsBlock.getColumn(index).getLong(tsBlockIndex));
case FLOAT:
- return String.valueOf(BytesUtils.bytesToFloat(values[index]));
+ return String.valueOf(curTsBlock.getColumn(index).getFloat(tsBlockIndex));
case DOUBLE:
- return String.valueOf(BytesUtils.bytesToDouble(values[index]));
+ return String.valueOf(curTsBlock.getColumn(index).getDouble(tsBlockIndex));
case TEXT:
- return new String(values[index], StandardCharsets.UTF_8);
+ return curTsBlock.getColumn(index).getBinary(tsBlockIndex).getStringValue();
default:
return null;
}
@@ -459,34 +553,19 @@ public class IoTDBRpcDataSet {
public Object getObjectByName(String columnName) throws StatementExecutionException {
checkRecord();
if (columnName.equals(TIMESTAMP_STR)) {
- return BytesUtils.bytesToLong(time);
+ return curTsBlock.getTimeByIndex(tsBlockIndex);
}
int index = columnOrdinalMap.get(columnName) - START_INDEX;
- if (index < 0 || index >= values.length || isNull(index, rowsIndex - 1)) {
+ if (index < 0 || index >= columnTypeDeduplicatedList.size() || isNull(index, tsBlockIndex)) {
lastReadWasNull = true;
return null;
}
lastReadWasNull = false;
- return getObject(index, columnTypeDeduplicatedList.get(index), values);
+ return getObject(index, columnTypeDeduplicatedList.get(index));
}
- public Object getObject(int index, TSDataType tsDataType, byte[][] values) {
- switch (tsDataType) {
- case BOOLEAN:
- return BytesUtils.bytesToBool(values[index]);
- case INT32:
- return BytesUtils.bytesToInt(values[index]);
- case INT64:
- return BytesUtils.bytesToLong(values[index]);
- case FLOAT:
- return BytesUtils.bytesToFloat(values[index]);
- case DOUBLE:
- return BytesUtils.bytesToDouble(values[index]);
- case TEXT:
- return new String(values[index], StandardCharsets.UTF_8);
- default:
- return null;
- }
+ public Object getObject(int index, TSDataType tsDataType) {
+ return curTsBlock.getColumn(index).getObject(tsBlockIndex);
}
public String findColumnNameByIndex(int columnIndex) throws StatementExecutionException {
@@ -501,13 +580,11 @@ public class IoTDBRpcDataSet {
}
public void checkRecord() throws StatementExecutionException {
- if (Objects.isNull(tsQueryDataSet)) {
+ if (queryResultIndex > queryResultSize
+ || tsBlockIndex >= tsBlockSize
+ || queryResult == null
+ || curTsBlock == null) {
throw new StatementExecutionException("No record remains");
}
}
-
- public void setTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
- this.tsQueryDataSet = tsQueryDataSet;
- this.emptyResultSet = (tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining());
- }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index abb8f56650..0be606c4d0 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -120,6 +120,7 @@ public enum TSStatusCode {
SHUT_DOWN_ERROR(505),
MULTIPLE_ERROR(506),
SESSION_EXPIRED(507),
+ TSBLOCK_SERIALIZE_ERROR(508),
WRONG_LOGIN_PASSWORD_ERROR(600),
NOT_LOGIN_ERROR(601),
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 43352fdc32..983f13e054 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -344,14 +344,14 @@ public class SessionConnection {
TSExecuteStatementResp execResp;
try {
execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeQueryStatement(execReq);
+ execResp = client.executeQueryStatementV2(execReq);
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
execReq.setSessionId(sessionId);
execReq.setStatementId(statementId);
- execResp = client.executeQueryStatement(execReq);
+ execResp = client.executeQueryStatementV2(execReq);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -370,7 +370,7 @@ public class SessionConnection {
statementId,
client,
sessionId,
- execResp.queryDataSet,
+ execResp.queryResult,
execResp.isIgnoreTimeStamp(),
timeout);
}
@@ -380,14 +380,14 @@ public class SessionConnection {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
try {
execReq.setEnableRedirectQuery(enableRedirect);
- TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
+ TSExecuteStatementResp execResp = client.executeUpdateStatementV2(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
execReq.setSessionId(sessionId);
execReq.setStatementId(statementId);
- RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).status);
+ RpcUtils.verifySuccess(client.executeUpdateStatementV2(execReq).status);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -407,14 +407,14 @@ public class SessionConnection {
TSExecuteStatementResp execResp;
try {
execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeRawDataQuery(execReq);
+ execResp = client.executeRawDataQueryV2(execReq);
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
execReq.setSessionId(sessionId);
execReq.setStatementId(statementId);
- execResp = client.executeRawDataQuery(execReq);
+ execResp = client.executeRawDataQueryV2(execReq);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -433,7 +433,7 @@ public class SessionConnection {
statementId,
client,
sessionId,
- execResp.queryDataSet,
+ execResp.queryResult,
execResp.isIgnoreTimeStamp());
}
@@ -446,14 +446,14 @@ public class SessionConnection {
tsLastDataQueryReq.setTimeout(timeOut);
TSExecuteStatementResp tsExecuteStatementResp;
try {
- tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+ tsExecuteStatementResp = client.executeLastDataQueryV2(tsLastDataQueryReq);
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
tsLastDataQueryReq.setSessionId(sessionId);
tsLastDataQueryReq.setStatementId(statementId);
- tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+ tsExecuteStatementResp = client.executeLastDataQueryV2(tsLastDataQueryReq);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -472,7 +472,7 @@ public class SessionConnection {
statementId,
client,
sessionId,
- tsExecuteStatementResp.queryDataSet,
+ tsExecuteStatementResp.queryResult,
tsExecuteStatementResp.isIgnoreTimeStamp());
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f991759309..f7b725ab73 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -22,16 +22,14 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.IoTDBRpcDataSet;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
+import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +50,7 @@ public class SessionDataSet implements AutoCloseable {
long statementId,
IClientRPCService.Iface client,
long sessionId,
- TSQueryDataSet queryDataSet,
+ List<ByteBuffer> queryResult,
boolean ignoreTimeStamp) {
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
@@ -61,11 +59,12 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
+ true,
queryId,
statementId,
client,
sessionId,
- queryDataSet,
+ queryResult,
SessionConfig.DEFAULT_FETCH_SIZE,
0);
}
@@ -79,7 +78,7 @@ public class SessionDataSet implements AutoCloseable {
long statementId,
IClientRPCService.Iface client,
long sessionId,
- TSQueryDataSet queryDataSet,
+ List<ByteBuffer> queryResult,
boolean ignoreTimeStamp,
long timeout) {
this.ioTDBRpcDataSet =
@@ -89,11 +88,12 @@ public class SessionDataSet implements AutoCloseable {
columnTypeList,
columnNameIndex,
ignoreTimeStamp,
+ true,
queryId,
statementId,
client,
sessionId,
- queryDataSet,
+ queryResult,
SessionConfig.DEFAULT_FETCH_SIZE,
timeout);
}
@@ -134,32 +134,31 @@ public class SessionDataSet implements AutoCloseable {
- START_INDEX;
if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) {
- byte[] valueBytes = ioTDBRpcDataSet.values[loc];
TSDataType dataType = ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc);
field = new Field(dataType);
switch (dataType) {
case BOOLEAN:
- boolean booleanValue = BytesUtils.bytesToBool(valueBytes);
+ boolean booleanValue = ioTDBRpcDataSet.getBoolean(datasetColumnIndex);
field.setBoolV(booleanValue);
break;
case INT32:
- int intValue = BytesUtils.bytesToInt(valueBytes);
+ int intValue = ioTDBRpcDataSet.getInt(datasetColumnIndex);
field.setIntV(intValue);
break;
case INT64:
- long longValue = BytesUtils.bytesToLong(valueBytes);
+ long longValue = ioTDBRpcDataSet.getLong(datasetColumnIndex);
field.setLongV(longValue);
break;
case FLOAT:
- float floatValue = BytesUtils.bytesToFloat(valueBytes);
+ float floatValue = ioTDBRpcDataSet.getFloat(datasetColumnIndex);
field.setFloatV(floatValue);
break;
case DOUBLE:
- double doubleValue = BytesUtils.bytesToDouble(valueBytes);
+ double doubleValue = ioTDBRpcDataSet.getDouble(datasetColumnIndex);
field.setDoubleV(doubleValue);
break;
case TEXT:
- field.setBinaryV(new Binary(valueBytes));
+ field.setBinaryV(ioTDBRpcDataSet.getBinary(datasetColumnIndex));
break;
default:
throw new UnSupportedDataTypeException(
@@ -172,7 +171,7 @@ public class SessionDataSet implements AutoCloseable {
}
outFields.add(field);
}
- return new RowRecord(BytesUtils.bytesToLong(ioTDBRpcDataSet.time), outFields);
+ return new RowRecord(ioTDBRpcDataSet.getTimestamp().getTime(), outFields);
}
public RowRecord next() throws StatementExecutionException, IoTDBConnectionException {
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 83d15737e1..dcf2171d13 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
include "common.thrift"
namespace java org.apache.iotdb.service.rpc.thrift
namespace py iotdb.thrift.rpc
@@ -66,6 +67,7 @@ struct TSExecuteStatementResp {
10: optional list<string> sgColumns
11: optional list<byte> aliasColumns
12: optional TSTracingInfo tracingInfo
+ 13: optional list<binary> queryResult
}
enum TSProtocolVersion {
@@ -173,6 +175,7 @@ struct TSFetchResultsResp{
3: required bool isAlign
4: optional TSQueryDataSet queryDataSet
5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
+ 6: optional list<binary> queryResult
}
struct TSFetchMetadataResp{
@@ -430,6 +433,19 @@ struct TSyncTransportMetaInfo{
}
service IClientRPCService {
+
+ TSExecuteStatementResp executeQueryStatementV2(1:TSExecuteStatementReq req);
+
+ TSExecuteStatementResp executeUpdateStatementV2(1:TSExecuteStatementReq req);
+
+ TSExecuteStatementResp executeStatementV2(1:TSExecuteStatementReq req);
+
+ TSExecuteStatementResp executeRawDataQueryV2(1:TSRawDataQueryReq req);
+
+ TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req);
+
+ TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);
+
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
common.TSStatus closeSession(1:TSCloseSessionReq req);
@@ -442,9 +458,9 @@ service IClientRPCService {
TSExecuteStatementResp executeUpdateStatement(1:TSExecuteStatementReq req);
- TSFetchResultsResp fetchResults(1:TSFetchResultsReq req)
+ TSFetchResultsResp fetchResults(1:TSFetchResultsReq req);
- TSFetchMetadataResp fetchMetadata(1:TSFetchMetadataReq req)
+ TSFetchMetadataResp fetchMetadata(1:TSFetchMetadataReq req);
common.TSStatus cancelOperation(1:TSCancelOperationReq req);