You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/22 07:28:04 UTC
[iotdb] 01/01: Cluster Test refine: Demo 1
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch feature/it-demo
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7d5490b9481dfc4232dfc7b9d58477b9ecbad15c
Author: ericpai <er...@hotmail.com>
AuthorDate: Fri Apr 22 15:26:14 2022 +0800
Cluster Test refine: Demo 1
---
checkstyle.xml | 24 +-
.../iotdb/cluster/server/ClusterTSServiceImpl.java | 7 +
.../iotdb/integration/env/ClusterEnvBase.java | 30 +-
.../itbase/runtime/ClusterTestConnection.java | 300 +++++++
.../iotdb/itbase/runtime/ClusterTestResultSet.java | 865 +++++++++++++++++++++
.../iotdb/itbase/runtime/ClusterTestStatement.java | 284 +++++++
.../aggregation/IoTDBAggregationIT.java | 34 +-
.../db/service/thrift/impl/TSServiceImpl.java | 219 +++---
8 files changed, 1616 insertions(+), 147 deletions(-)
diff --git a/checkstyle.xml b/checkstyle.xml
index 8dd797112f..4d4eb175a9 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -1,5 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE module PUBLIC "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
- "https://checkstyle.org/dtds/configuration_1_3.dtd">
+<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE module PUBLIC "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN" "https://checkstyle.org/dtds/configuration_1_3.dtd">
<!--
@@ -45,10 +44,8 @@
<module name="OuterTypeFilename"/>
<module name="IllegalTokenText">
<property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
- <property name="format"
- value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
- <property name="message"
- value="Consider using special escape sequence instead of octal value or Unicode escaped value."/>
+ <property name="format" value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+ <property name="message" value="Consider using special escape sequence instead of octal value or Unicode escaped value."/>
</module>
<module name="AvoidEscapedUnicodeCharacters">
<property name="allowEscapesForControlCharacters" value="true"/>
@@ -72,22 +69,19 @@
<module name="LeftCurly"/>
<module name="RightCurly">
<property name="id" value="RightCurlySame"/>
- <property name="tokens"
- value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
+ <property name="tokens" value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
</module>
<module name="RightCurly">
<property name="id" value="RightCurlyAlone"/>
<property name="option" value="alone"/>
- <property name="tokens"
- value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
+ <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
</module>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
<property name="allowEmptyTypes" value="true"/>
<property name="allowEmptyLoops" value="true"/>
- <message key="ws.notFollowed"
- value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
+ <message key="ws.notFollowed" value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
<message key="ws.notPreceded" value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
</module>
<module name="OneStatementPerLine"/>
@@ -196,8 +190,7 @@
<module name="ParenPad"/>
<module name="OperatorWrap">
<property name="option" value="NL"/>
- <property name="tokens"
- value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
+ <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
</module>
<module name="AnnotationLocation">
<property name="id" value="AnnotationLocationMostCases"/>
@@ -211,8 +204,7 @@
<module name="NonEmptyAtclauseDescription"/>
<module name="JavadocTagContinuationIndentation"/>
<module name="SummaryJavadoc">
- <property name="forbiddenSummaryFragments"
- value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
+ <property name="forbiddenSummaryFragments" value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
</module>
<module name="JavadocParagraph"/>
<module name="AtclauseOrder">
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index 0c7528dc22..f740f6aaf0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.cluster.server;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
import org.apache.iotdb.cluster.server.basic.ClusterServiceProvider;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
@@ -44,4 +46,9 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
return clusterServiceProvider.executeNonQueryPlan(plan);
}
+
+ @Override
+ public SessionManager getSessionManager() {
+ return ClusterSessionManager.getInstance();
+ }
}
diff --git a/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java b/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
index b7ae7ff61c..06e153c72e 100644
--- a/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
+++ b/integration/src/main/java/org/apache/iotdb/integration/env/ClusterEnvBase.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.integration.env;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.runtime.ClusterTestConnection;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.Constant;
import org.apache.iotdb.jdbc.IoTDBConnection;
@@ -37,7 +38,6 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.iotdb.jdbc.Config.VERSION;
import static org.junit.Assert.fail;
public abstract class ClusterEnvBase implements BaseEnv {
@@ -212,11 +212,11 @@ public abstract class ClusterEnvBase implements BaseEnv {
@Override
public Connection getConnection() throws SQLException {
- Connection connection = null;
-
+ Connection writeConnection = null;
+ List<Connection> readConnections = new ArrayList<>();
try {
Class.forName(Config.JDBC_DRIVER_NAME);
- connection =
+ Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
+ this.nodes.get(0).getIp()
@@ -224,11 +224,29 @@ public abstract class ClusterEnvBase implements BaseEnv {
+ this.nodes.get(0).getPort(),
System.getProperty("User", "root"),
System.getProperty("Password", "root"));
+ writeConnection = connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
fail();
}
- return connection;
+ for (int i = 1; i < this.nodes.size(); i++) {
+ try {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ Connection readConnection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX
+ + this.nodes.get(i).getIp()
+ + ":"
+ + this.nodes.get(i).getPort(),
+ System.getProperty("User", "root"),
+ System.getProperty("Password", "root"));
+ readConnections.add(readConnection);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ return new ClusterTestConnection(writeConnection, readConnections);
}
public IoTDBConnection getConnection(int queryTimeout) throws SQLException {
@@ -265,7 +283,7 @@ public abstract class ClusterEnvBase implements BaseEnv {
+ ":"
+ this.nodes.get(0).getPort()
+ "?"
- + VERSION
+ + Config.VERSION
+ "="
+ version.toString(),
System.getProperty("User", "root"),
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
new file mode 100644
index 0000000000..5437c550a1
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestConnection.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class ClusterTestConnection implements Connection {
+
+ private final Connection writeConn;
+ private final List<Connection> readConns;
+
+ public ClusterTestConnection(Connection writeConn, List<Connection> readConns) {
+ this.writeConn = writeConn;
+ this.readConns = readConns;
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return new ClusterTestStatement(writeConn, readConns);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String nativeSQL(String sql) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getAutoCommit() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void commit() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void rollback() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws SQLException {}
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {}
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {}
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {}
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {}
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {}
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {}
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {}
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {}
+
+ @Override
+ public Statement createStatement(
+ int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(
+ String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(
+ String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {}
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {}
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {}
+
+ @Override
+ public String getSchema() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {}
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {}
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+}
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
new file mode 100644
index 0000000000..e9f6b7714e
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestResultSet.java
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ClusterTestResultSet implements ResultSet {
+
+ private final List<ResultSet> resultSets;
+
+ public ClusterTestResultSet(List<Statement> statements, String sql) throws SQLException {
+ resultSets = new ArrayList<>(statements.size());
+ for (Statement statement : statements) {
+ resultSets.add(statement.executeQuery(sql));
+ }
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ boolean hasNext = resultSets.get(0).next();
+ for (int i = 1; i < resultSets.size(); i++) {
+ boolean currentSetHasNext = resultSets.get(i).next();
+ if (currentSetHasNext != hasNext) {
+ throw new SQLException("datasets are inconsistent!");
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ SQLException ex = null;
+ for (ResultSet rs : resultSets) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean wasNull() throws SQLException {
+ boolean wasNull = resultSets.get(0).wasNull();
+ for (int i = 1; i < resultSets.size(); i++) {
+ boolean currentWasNull = resultSets.get(i).wasNull();
+ if (currentWasNull != wasNull) {
+ throw new SQLException("datasets are inconsistent!");
+ }
+ }
+ return wasNull;
+ }
+
+ @Override
+ public String getString(String columnLabel) throws SQLException {
+ String result = resultSets.get(0).getString(columnLabel);
+ System.out.println("Result 0 get [" + columnLabel + "]: " + result);
+ for (int i = 1; i < resultSets.size(); i++) {
+ String currentResult = resultSets.get(i).getString(columnLabel);
+ System.out.println("Result " + i + " get [" + columnLabel + "]: " + result);
+ if (!Objects.equals(result, currentResult)) {
+ throw new SQLException("datasets are inconsistent!");
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public String getString(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public byte getByte(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public short getShort(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int columnIndex) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytes(int columnIndex) throws SQLException {
+ return new byte[0];
+ }
+
+ @Override
+ public Date getDate(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Time getTime(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getAsciiStream(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getUnicodeStream(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getBinaryStream(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean getBoolean(String columnLabel) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public byte getByte(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public short getShort(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getInt(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public long getLong(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytes(String columnLabel) throws SQLException {
+ return new byte[0];
+ }
+
+ @Override
+ public Date getDate(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Time getTime(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Timestamp getTimestamp(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getAsciiStream(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getUnicodeStream(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public InputStream getBinaryStream(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {}
+
+ @Override
+ public String getCursorName() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getObject(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getObject(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int findColumn(String columnLabel) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Reader getCharacterStream(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Reader getCharacterStream(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isBeforeFirst() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isAfterLast() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isFirst() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isLast() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void beforeFirst() throws SQLException {}
+
+ @Override
+ public void afterLast() throws SQLException {}
+
+ @Override
+ public boolean first() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean last() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public int getRow() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean absolute(int row) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean relative(int rows) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean previous() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {}
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {}
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getType() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getConcurrency() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean rowUpdated() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean rowInserted() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean rowDeleted() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void updateNull(int columnIndex) throws SQLException {}
+
+ @Override
+ public void updateBoolean(int columnIndex, boolean x) throws SQLException {}
+
+ @Override
+ public void updateByte(int columnIndex, byte x) throws SQLException {}
+
+ @Override
+ public void updateShort(int columnIndex, short x) throws SQLException {}
+
+ @Override
+ public void updateInt(int columnIndex, int x) throws SQLException {}
+
+ @Override
+ public void updateLong(int columnIndex, long x) throws SQLException {}
+
+ @Override
+ public void updateFloat(int columnIndex, float x) throws SQLException {}
+
+ @Override
+ public void updateDouble(int columnIndex, double x) throws SQLException {}
+
+ @Override
+ public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {}
+
+ @Override
+ public void updateString(int columnIndex, String x) throws SQLException {}
+
+ @Override
+ public void updateBytes(int columnIndex, byte[] x) throws SQLException {}
+
+ @Override
+ public void updateDate(int columnIndex, Date x) throws SQLException {}
+
+ @Override
+ public void updateTime(int columnIndex, Time x) throws SQLException {}
+
+ @Override
+ public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {}
+
+ @Override
+ public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {}
+
+ @Override
+ public void updateObject(int columnIndex, Object x) throws SQLException {}
+
+ @Override
+ public void updateNull(String columnLabel) throws SQLException {}
+
+ @Override
+ public void updateBoolean(String columnLabel, boolean x) throws SQLException {}
+
+ @Override
+ public void updateByte(String columnLabel, byte x) throws SQLException {}
+
+ @Override
+ public void updateShort(String columnLabel, short x) throws SQLException {}
+
+ @Override
+ public void updateInt(String columnLabel, int x) throws SQLException {}
+
+ @Override
+ public void updateLong(String columnLabel, long x) throws SQLException {}
+
+ @Override
+ public void updateFloat(String columnLabel, float x) throws SQLException {}
+
+ @Override
+ public void updateDouble(String columnLabel, double x) throws SQLException {}
+
+ @Override
+ public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {}
+
+ @Override
+ public void updateString(String columnLabel, String x) throws SQLException {}
+
+ @Override
+ public void updateBytes(String columnLabel, byte[] x) throws SQLException {}
+
+ @Override
+ public void updateDate(String columnLabel, Date x) throws SQLException {}
+
+ @Override
+ public void updateTime(String columnLabel, Time x) throws SQLException {}
+
+ @Override
+ public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x, int length)
+ throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x, int length)
+ throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader, int length)
+ throws SQLException {}
+
+ @Override
+ public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {}
+
+ @Override
+ public void updateObject(String columnLabel, Object x) throws SQLException {}
+
+ @Override
+ public void insertRow() throws SQLException {}
+
+ @Override
+ public void updateRow() throws SQLException {}
+
+ @Override
+ public void deleteRow() throws SQLException {}
+
+ @Override
+ public void refreshRow() throws SQLException {}
+
+ @Override
+ public void cancelRowUpdates() throws SQLException {}
+
+ @Override
+ public void moveToInsertRow() throws SQLException {}
+
+ @Override
+ public void moveToCurrentRow() throws SQLException {}
+
+ @Override
+ public Statement getStatement() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getObject(int columnIndex, Map<String, Class<?>> map) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Ref getRef(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob getBlob(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob getClob(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array getArray(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getObject(String columnLabel, Map<String, Class<?>> map) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Ref getRef(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob getBlob(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob getClob(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array getArray(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Date getDate(int columnIndex, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Time getTime(int columnIndex, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public URL getURL(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public URL getURL(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void updateRef(int columnIndex, Ref x) throws SQLException {}
+
+ @Override
+ public void updateRef(String columnLabel, Ref x) throws SQLException {}
+
+ @Override
+ public void updateBlob(int columnIndex, Blob x) throws SQLException {}
+
+ @Override
+ public void updateBlob(String columnLabel, Blob x) throws SQLException {}
+
+ @Override
+ public void updateClob(int columnIndex, Clob x) throws SQLException {}
+
+ @Override
+ public void updateClob(String columnLabel, Clob x) throws SQLException {}
+
+ @Override
+ public void updateArray(int columnIndex, Array x) throws SQLException {}
+
+ @Override
+ public void updateArray(String columnLabel, Array x) throws SQLException {}
+
+ @Override
+ public RowId getRowId(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public RowId getRowId(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void updateRowId(int columnIndex, RowId x) throws SQLException {}
+
+ @Override
+ public void updateRowId(String columnLabel, RowId x) throws SQLException {}
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void updateNString(int columnIndex, String nString) throws SQLException {}
+
+ @Override
+ public void updateNString(String columnLabel, String nString) throws SQLException {}
+
+ @Override
+ public void updateNClob(int columnIndex, NClob nClob) throws SQLException {}
+
+ @Override
+ public void updateNClob(String columnLabel, NClob nClob) throws SQLException {}
+
+ @Override
+ public NClob getNClob(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public NClob getNClob(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML getSQLXML(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML getSQLXML(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {}
+
+ @Override
+ public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {}
+
+ @Override
+ public String getNString(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String getNString(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Reader getNCharacterStream(int columnIndex) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Reader getNCharacterStream(String columnLabel) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException {}
+
+ @Override
+ public void updateNCharacterStream(String columnLabel, Reader reader, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateBlob(int columnIndex, InputStream inputStream, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateBlob(String columnLabel, InputStream inputStream, long length)
+ throws SQLException {}
+
+ @Override
+ public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+ @Override
+ public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {}
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {}
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {}
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {}
+
+ @Override
+ public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {}
+
+ @Override
+ public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {}
+
+ @Override
+ public void updateClob(int columnIndex, Reader reader) throws SQLException {}
+
+ @Override
+ public void updateClob(String columnLabel, Reader reader) throws SQLException {}
+
+ @Override
+ public void updateNClob(int columnIndex, Reader reader) throws SQLException {}
+
+ @Override
+ public void updateNClob(String columnLabel, Reader reader) throws SQLException {}
+
+ @Override
+ public <T> T getObject(int columnIndex, Class<T> type) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+}
diff --git a/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
new file mode 100644
index 0000000000..bffeb95601
--- /dev/null
+++ b/integration/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.itbase.runtime;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterTestStatement implements Statement {
+
+ private final Statement writeStatement;
+ private final List<Statement> readStatements = new ArrayList<>();
+ private boolean closed = false;
+ private int maxRows = Integer.MAX_VALUE;
+ private int queryTimeout = Integer.MAX_VALUE;
+
+ public ClusterTestStatement(Connection writeConnection, List<Connection> readConnections)
+ throws SQLException {
+ this.writeStatement = writeConnection.createStatement();
+ updateConfig(writeStatement);
+ for (Connection readConnection : readConnections) {
+ Statement readStatement = readConnection.createStatement();
+ this.readStatements.add(readStatement);
+ updateConfig(readStatement);
+ }
+ }
+
+ private void updateConfig(Statement statement) throws SQLException {
+ maxRows = Math.min(statement.getMaxRows(), maxRows);
+ queryTimeout = Math.min(statement.getQueryTimeout(), queryTimeout);
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return new ClusterTestResultSet(readStatements, sql);
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ return writeStatement.executeUpdate(sql);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (writeStatement != null) {
+ writeStatement.close();
+ }
+ readStatements.forEach(
+ r -> {
+ try {
+ r.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ closed = true;
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ return writeStatement.getMaxFieldSize();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ writeStatement.setMaxFieldSize(max);
+ }
+
+ @Override
+ public int getMaxRows() {
+ return maxRows;
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ for (Statement readStatement : readStatements) {
+ readStatement.setMaxRows(max);
+ }
+ maxRows = max;
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ writeStatement.setEscapeProcessing(enable);
+ for (Statement readStatement : readStatements) {
+ readStatement.setEscapeProcessing(enable);
+ }
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {}
+
+ @Override
+ public void cancel() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ return writeStatement.execute(sql);
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ throw new UnsupportedOperationException(
+ "In integration test you must use Statement.executeQuery() to query data");
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {}
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {}
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ writeStatement.addBatch(sql);
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ writeStatement.clearBatch();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ return writeStatement.executeBatch();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return closed;
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
index f3bd89cd8b..3937c8b51a 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
@@ -129,13 +129,10 @@ public class IoTDBAggregationIT {
String[] retArray = new String[] {"0,2", "0,4", "0,3"};
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
-
- boolean hasResultSet =
- statement.execute("SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3");
-
- Assert.assertTrue(hasResultSet);
int cnt;
- try (ResultSet resultSet = statement.getResultSet()) {
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3")) {
cnt = 0;
while (resultSet.next()) {
String ans =
@@ -148,11 +145,9 @@ public class IoTDBAggregationIT {
Assert.assertEquals(1, cnt);
}
- hasResultSet =
- statement.execute(
- "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3 order by time desc");
- Assert.assertTrue(hasResultSet);
- try (ResultSet resultSet = statement.getResultSet()) {
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT count(temperature) FROM root.ln.wf01.wt01 WHERE time > 3 order by time desc")) {
cnt = 0;
while (resultSet.next()) {
String ans =
@@ -165,11 +160,9 @@ public class IoTDBAggregationIT {
Assert.assertEquals(1, cnt);
}
- hasResultSet =
- statement.execute("SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE time > 3");
-
- Assert.assertTrue(hasResultSet);
- try (ResultSet resultSet = statement.getResultSet()) {
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE time > 3")) {
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
@@ -181,12 +174,9 @@ public class IoTDBAggregationIT {
Assert.assertEquals(2, cnt);
}
- hasResultSet =
- statement.execute(
- "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE temperature > 3");
-
- Assert.assertTrue(hasResultSet);
- try (ResultSet resultSet = statement.getResultSet()) {
+ try (ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT min_time(temperature) FROM root.ln.wf01.wt01 WHERE temperature > 3")) {
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5949a128d6..1e1153a02f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -161,7 +161,11 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIEventHandler {
- private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+ private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ public SessionManager getSessionManager() {
+ return SESSION_MANAGER;
+ }
private class QueryTask implements Callable<TSExecuteStatementResp> {
@@ -203,13 +207,13 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp call() throws Exception {
- String username = SESSION_MANAGER.getUsername(sessionId);
+ String username = getSessionManager().getUsername(sessionId);
plan.setLoginUserName(username);
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
- final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
+ final long queryId = getSessionManager().requestQueryId(statementId, true);
QueryContext context =
serviceProvider.genQueryContext(
queryId, plan.isDebug(), queryStartTime, statement, timeout);
@@ -226,7 +230,7 @@ public class TSServiceImpl implements TSIEventHandler {
resp.setQueryId(queryId);
resp.setOperationType(plan.getOperatorType().toString());
} catch (Exception e) {
- SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+ getSessionManager().releaseQueryResourceNoExceptions(queryId);
throw e;
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, queryStartTime);
@@ -255,15 +259,16 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSFetchResultsResp call() throws Exception {
- QueryDataSet queryDataSet = SESSION_MANAGER.getDataset(queryId);
+ QueryDataSet queryDataSet = getSessionManager().getDataset(queryId);
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
try {
if (isAlign) {
TSQueryDataSet result =
- fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ fillRpcReturnData(
+ fetchSize, queryDataSet, getSessionManager().getUsername(sessionId));
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
- SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+ getSessionManager().releaseQueryResourceNoExceptions(queryId);
}
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
@@ -271,7 +276,7 @@ public class TSServiceImpl implements TSIEventHandler {
} else {
TSQueryNonAlignDataSet nonAlignResult =
fillRpcNonAlignReturnData(
- fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+ fetchSize, queryDataSet, getSessionManager().getUsername(sessionId));
boolean hasResultSet = false;
for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
if (timeBuffer.limit() != 0) {
@@ -280,7 +285,7 @@ public class TSServiceImpl implements TSIEventHandler {
}
}
if (!hasResultSet) {
- SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+ getSessionManager().releaseQueryResourceNoExceptions(queryId);
}
resp.setHasResultSet(hasResultSet);
resp.setNonAlignQueryDataSet(nonAlignResult);
@@ -289,7 +294,7 @@ public class TSServiceImpl implements TSIEventHandler {
QUERY_TIME_MANAGER.unRegisterQuery(queryId, false);
return resp;
} catch (Exception e) {
- SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+ getSessionManager().releaseQueryResourceNoExceptions(queryId);
throw e;
}
}
@@ -312,8 +317,9 @@ public class TSServiceImpl implements TSIEventHandler {
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
BasicOpenSessionResp openSessionResp =
- SESSION_MANAGER.openSession(
- req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+ getSessionManager()
+ .openSession(
+ req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
@@ -330,7 +336,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
return new TSStatus(
- !SESSION_MANAGER.closeSession(req.sessionId)
+ !getSessionManager().closeSession(req.sessionId)
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -343,15 +349,20 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus closeOperation(TSCloseOperationReq req) {
- return SESSION_MANAGER.closeOperation(
- req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+ return getSessionManager()
+ .closeOperation(
+ req.sessionId,
+ req.queryId,
+ req.statementId,
+ req.isSetStatementId(),
+ req.isSetQueryId());
}
@Override
public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
TSFetchMetadataResp resp = new TSFetchMetadataResp();
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return resp.setStatus(getNotLoggedInStatus());
}
@@ -497,7 +508,7 @@ public class TSServiceImpl implements TSIEventHandler {
long t1 = System.currentTimeMillis();
List<TSStatus> result = new ArrayList<>();
boolean isAllSuccessful = true;
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
@@ -514,8 +525,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.sessionId),
+ getSessionManager().getClientVersion(req.sessionId));
if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
throw new QueryInBatchStatementException(statement);
}
@@ -529,7 +540,7 @@ public class TSServiceImpl implements TSIEventHandler {
index = 0;
}
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(physicalPlan, req.getSessionId());
if (status != null) {
insertRowsPlan.getResults().put(index, status);
isAllSuccessful = false;
@@ -551,7 +562,7 @@ public class TSServiceImpl implements TSIEventHandler {
multiPlan = new CreateMultiTimeSeriesPlan();
executeList.add(multiPlan);
}
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(physicalPlan, req.getSessionId());
if (status != null) {
multiPlan.getResults().put(i, status);
isAllSuccessful = false;
@@ -603,7 +614,7 @@ public class TSServiceImpl implements TSIEventHandler {
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
String statement = req.getStatement();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -613,8 +624,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.getSessionId()),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.getSessionId()),
+ getSessionManager().getClientVersion(req.sessionId));
if (physicalPlan.isQuery()) {
return submitQueryTask(physicalPlan, startTime, req);
@@ -641,7 +652,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -652,8 +663,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.sessionId),
+ getSessionManager().getClientVersion(req.sessionId));
if (physicalPlan.isQuery()) {
return submitQueryTask(physicalPlan, startTime, req);
@@ -677,7 +688,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -687,8 +698,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.rawDataQueryReqToPhysicalPlan(
req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.sessionId),
+ getSessionManager().getClientVersion(req.sessionId));
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -723,7 +734,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -733,8 +744,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.lastDataQueryReqToPhysicalPlan(
req,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.sessionId),
+ getSessionManager().getClientVersion(req.sessionId));
if (physicalPlan.isQuery()) {
Future<TSExecuteStatementResp> resp =
@@ -797,7 +808,7 @@ public class TSServiceImpl implements TSIEventHandler {
List<? extends PartialPath> authPaths = plan.getAuthPaths();
if (authPaths != null
&& !authPaths.isEmpty()
- && !SESSION_MANAGER.checkAuthorization(plan, username)) {
+ && !getSessionManager().checkAuthorization(plan, username)) {
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
@@ -919,13 +930,13 @@ public class TSServiceImpl implements TSIEventHandler {
long sessionId)
throws IoTDBException, TException, SQLException, IOException, InterruptedException,
QueryFilterOptimizationException {
- TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(physicalPlan, sessionId);
if (status != null) {
return new TSExecuteStatementResp(status);
}
final long startTime = System.currentTimeMillis();
- final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
+ final long queryId = getSessionManager().requestQueryId(statementId, true);
QueryContext context =
serviceProvider.genQueryContext(
queryId, physicalPlan.isDebug(), startTime, statement, timeout);
@@ -934,7 +945,7 @@ public class TSServiceImpl implements TSIEventHandler {
QUERY_FREQUENCY_RECORDER.incrementAndGet();
AUDIT_LOGGER.debug(
- "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+ "Session {} execute select into: {}", getSessionManager().getCurrSessionId(), statement);
if (queryPlan.isEnableTracing()) {
TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
}
@@ -961,7 +972,7 @@ public class TSServiceImpl implements TSIEventHandler {
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId);
} finally {
- SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
+ getSessionManager().releaseQueryResourceNoExceptions(queryId);
addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
long costTime = System.currentTimeMillis() - startTime;
if (costTime >= CONFIG.getSlowQueryThreshold()) {
@@ -975,7 +986,7 @@ public class TSServiceImpl implements TSIEventHandler {
InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < insertTabletPlans.size(); i++) {
InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, sessionId);
if (status != null) {
// not authorized
@@ -991,11 +1002,11 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
- if (!SESSION_MANAGER.hasDataset(req.queryId)) {
+ if (!getSessionManager().hasDataset(req.queryId)) {
return RpcUtils.getTSFetchResultsResp(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
}
@@ -1057,7 +1068,7 @@ public class TSServiceImpl implements TSIEventHandler {
/** update statement can be: 1. select-into statement 2. non-query statement */
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
@@ -1067,8 +1078,8 @@ public class TSServiceImpl implements TSIEventHandler {
.getPlanner()
.parseSQLToPhysicalPlan(
req.statement,
- SESSION_MANAGER.getZoneId(req.sessionId),
- SESSION_MANAGER.getClientVersion(req.sessionId));
+ getSessionManager().getZoneId(req.sessionId),
+ getSessionManager().getClientVersion(req.sessionId));
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1108,16 +1119,16 @@ public class TSServiceImpl implements TSIEventHandler {
}
private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
return status != null
? new TSExecuteStatementResp(status)
: RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
- .setQueryId(SESSION_MANAGER.requestQueryId(false));
+ .setQueryId(getSessionManager().requestQueryId(false));
}
@Override
public void handleClientExit() {
- Long sessionId = SESSION_MANAGER.getCurrSessionId();
+ Long sessionId = getSessionManager().getCurrSessionId();
if (sessionId != null) {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
@@ -1127,7 +1138,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+ ZoneId zoneId = getSessionManager().getZoneId(sessionId);
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1142,7 +1153,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+ getSessionManager().setTimezone(req.sessionId, req.timeZone);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(
@@ -1178,14 +1189,14 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertRecords(TSInsertRecordsReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1200,7 +1211,7 @@ public class TSServiceImpl implements TSIEventHandler {
req.getMeasurementsList().get(i).toArray(new String[0]),
req.valuesList.get(i),
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
allCheckSuccess = false;
@@ -1250,14 +1261,14 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1271,7 +1282,7 @@ public class TSServiceImpl implements TSIEventHandler {
req.getMeasurementsList(),
req.getValuesList(),
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
statusList.add(status != null ? status : executeNonQueryPlan(plan));
} catch (IoTDBException e) {
statusList.add(
@@ -1296,14 +1307,14 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.prefixPath,
req.getTimestamps().get(0));
}
@@ -1319,7 +1330,7 @@ public class TSServiceImpl implements TSIEventHandler {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1354,14 +1365,14 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.prefixPaths.get(0),
req.getTimestamps().get(0));
}
@@ -1377,7 +1388,7 @@ public class TSServiceImpl implements TSIEventHandler {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
if (status != null) {
insertRowsPlan.getResults().put(i, status);
@@ -1468,13 +1479,13 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertRecord(TSInsertRecordReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getPrefixPath(),
req.getTimestamp());
@@ -1485,7 +1496,7 @@ public class TSServiceImpl implements TSIEventHandler {
req.getMeasurements().toArray(new String[0]),
req.values,
req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
@@ -1498,13 +1509,13 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getPrefixPath(),
req.getTimestamp());
@@ -1516,7 +1527,7 @@ public class TSServiceImpl implements TSIEventHandler {
plan.setValues(req.getValues().toArray(new Object[0]));
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
@@ -1529,7 +1540,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
@@ -1541,7 +1552,7 @@ public class TSServiceImpl implements TSIEventHandler {
paths.add(new PartialPath(path));
}
plan.addPaths(paths);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
} catch (IoTDBException e) {
@@ -1556,7 +1567,7 @@ public class TSServiceImpl implements TSIEventHandler {
public TSStatus insertTablet(TSInsertTabletReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
@@ -1571,7 +1582,7 @@ public class TSServiceImpl implements TSIEventHandler {
insertTabletPlan.setRowCount(req.size);
insertTabletPlan.setDataTypes(req.types);
insertTabletPlan.setAligned(req.isAligned);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(insertTabletPlan);
} catch (IoTDBException e) {
@@ -1588,7 +1599,7 @@ public class TSServiceImpl implements TSIEventHandler {
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
@@ -1633,7 +1644,7 @@ public class TSServiceImpl implements TSIEventHandler {
InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < req.prefixPaths.size(); i++) {
InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
- TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(insertTabletPlan, req.getSessionId());
if (status != null) {
// not authorized
insertMultiTabletsPlan.getResults().put(i, status);
@@ -1648,12 +1659,12 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!getSessionManager().checkLogin(sessionId)) {
return getNotLoggedInStatus();
}
SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
@@ -1667,7 +1678,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!getSessionManager().checkLogin(sessionId)) {
return getNotLoggedInStatus();
}
@@ -1676,7 +1687,7 @@ public class TSServiceImpl implements TSIEventHandler {
storageGroupList.add(new PartialPath(storageGroup));
}
DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1689,13 +1700,15 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
- "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+ "Session-{} create timeseries {}",
+ getSessionManager().getCurrSessionId(),
+ req.getPath());
}
CreateTimeSeriesPlan plan =
@@ -1708,7 +1721,7 @@ public class TSServiceImpl implements TSIEventHandler {
req.tags,
req.attributes,
req.measurementAlias);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1721,7 +1734,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
@@ -1739,7 +1752,7 @@ public class TSServiceImpl implements TSIEventHandler {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create aligned timeseries {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getPrefixPath(),
req.getMeasurements());
}
@@ -1767,7 +1780,7 @@ public class TSServiceImpl implements TSIEventHandler {
req.measurementAlias,
req.tagsList,
req.attributesList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1781,14 +1794,14 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getPaths().size(),
req.getPaths().get(0));
}
@@ -1819,7 +1832,7 @@ public class TSServiceImpl implements TSIEventHandler {
CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
for (int i = 0; i < req.paths.size(); i++) {
plan.setPath(new PartialPath(req.paths.get(i)));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
if (status != null) {
// not authorized
multiPlan.getResults().put(i, status);
@@ -1868,7 +1881,7 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
try {
- if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ if (!getSessionManager().checkLogin(sessionId)) {
return getNotLoggedInStatus();
}
@@ -1877,7 +1890,7 @@ public class TSServiceImpl implements TSIEventHandler {
pathList.add(new PartialPath(path));
}
DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+ TSStatus status = getSessionManager().checkAuthority(plan, sessionId);
return status != null ? status : executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1889,20 +1902,20 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public long requestStatementId(long sessionId) {
- return SESSION_MANAGER.requestStatementId(sessionId);
+ return getSessionManager().requestStatementId(sessionId);
}
@Override
public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
try {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create schema template {}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getName());
}
@@ -1910,7 +1923,7 @@ public class TSServiceImpl implements TSIEventHandler {
// Construct plan from serialized request
ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
plan = CreateTemplatePlan.deserializeFromReq(buffer);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (Exception e) {
@@ -1937,7 +1950,7 @@ public class TSServiceImpl implements TSIEventHandler {
AppendTemplatePlan plan =
new AppendTemplatePlan(
req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -1945,7 +1958,7 @@ public class TSServiceImpl implements TSIEventHandler {
public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
PruneTemplatePlan plan =
new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}
@@ -1999,21 +2012,21 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set device template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getTemplateName(),
req.getPrefixPath());
}
try {
SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2022,21 +2035,21 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} unset schema template {}.{}",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getPrefixPath(),
req.getTemplateName());
}
try {
UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
} catch (IllegalPathException e) {
return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2045,19 +2058,19 @@ public class TSServiceImpl implements TSIEventHandler {
@Override
public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
- if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ if (!getSessionManager().checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} drop schema template {}.",
- SESSION_MANAGER.getCurrSessionId(),
+ getSessionManager().getCurrSessionId(),
req.getTemplateName());
}
DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
- TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+ TSStatus status = getSessionManager().checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
}