You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/01 09:06:30 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] upgrade audit log (#8031)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 510c8ee223 [To rel/0.13] upgrade audit log (#8031)
510c8ee223 is described below
commit 510c8ee22348c7f2e76be7f75b5a6acf1aa482a8
Author: Zhijia Cao <ca...@126.com>
AuthorDate: Thu Dec 1 17:06:24 2022 +0800
[To rel/0.13] upgrade audit log (#8031)
---
.../server/basic/ClusterServiceProvider.java | 5 -
.../main/java/org/apache/iotdb/jdbc/Config.java | 4 +
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 7 +-
.../apache/iotdb/jdbc/IoTDBConnectionParams.java | 10 ++
.../src/main/java/org/apache/iotdb/jdbc/Utils.java | 3 +
.../test/java/org/apache/iotdb/jdbc/UtilsTest.java | 10 ++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 6 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 40 +++--
.../iotdb/db/query/control/SessionManager.java | 4 +-
.../query/control/clientsession/ClientSession.java | 2 +-
.../control/clientsession/IClientSession.java | 10 ++
.../java/org/apache/iotdb/db/service/IoTDB.java | 3 +-
.../iotdb/db/service/basic/ServiceProvider.java | 53 ++++--
.../service/basic/StandaloneServiceProvider.java | 5 -
.../db/service/thrift/impl/TSServiceImpl.java | 197 ++++++++++++---------
.../org/apache/iotdb/db/utils/AuditLogUtils.java | 50 +++---
.../main/java/org/apache/iotdb/session/Config.java | 2 +
.../java/org/apache/iotdb/session/Session.java | 87 ++++++++-
.../apache/iotdb/session/SessionConnection.java | 6 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 105 ++++++++++-
20 files changed, 451 insertions(+), 158 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/basic/ClusterServiceProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/basic/ClusterServiceProvider.java
index 37d579e21b..be515775f4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/basic/ClusterServiceProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/basic/ClusterServiceProvider.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.db.utils.AuditLogUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -90,10 +89,6 @@ public class ClusterServiceProvider extends ServiceProvider {
@Override
public boolean executeNonQuery(PhysicalPlan plan) {
- AuditLogUtils.writeAuditLog(
- plan.getOperatorName(),
- String.format(
- "measurements size:%s", plan.getPaths() == null ? 0 : plan.getPaths().size()));
TSStatus tsStatus = executeNonQueryPlan(plan);
return tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 0fa3cfb648..74e6fbd52e 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -56,4 +56,8 @@ public class Config {
/** key of thrift max frame size */
public static final String THRIFT_FRAME_MAX_SIZE = "thrift_max_frame_size";
+
+ public static final String AUTH_ENABLE_AUDIT = "enableAudit";
+
+ public static final boolean DEFAULT_ENABLE_AUDIT = true;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index dc0cc05d97..1137926a68 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -59,6 +59,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
+import static org.apache.iotdb.jdbc.Config.AUTH_ENABLE_AUDIT;
+import static org.apache.iotdb.jdbc.Config.VERSION;
+
public class IoTDBConnection implements Connection {
private static final Logger logger = LoggerFactory.getLogger(IoTDBConnection.class);
@@ -463,8 +466,8 @@ public class IoTDBConnection implements Connection {
openReq.setUsername(params.getUsername());
openReq.setPassword(params.getPassword());
openReq.setZoneId(getTimeZone());
- openReq.putToConfiguration("version", params.getVersion().toString());
-
+ openReq.putToConfiguration(VERSION, params.getVersion().toString());
+ openReq.putToConfiguration(AUTH_ENABLE_AUDIT, String.valueOf(params.isEnableAudit()));
TSOpenSessionResp openResp = null;
try {
openResp = client.openSession(openReq);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index 0eb86fed10..cbf2811aab 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -35,6 +35,8 @@ public class IoTDBConnectionParams {
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
private int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
+ private boolean enableAudit = Config.DEFAULT_ENABLE_AUDIT;
+
public IoTDBConnectionParams(String url) {
this.jdbcUriString = url;
}
@@ -110,4 +112,12 @@ public class IoTDBConnectionParams {
public void setVersion(Constant.Version version) {
this.version = version;
}
+
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
index 17c30a17a2..1a76c5ada5 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
@@ -74,6 +74,9 @@ public class Utils {
if (info.containsKey(Config.VERSION)) {
params.setVersion(Constant.Version.valueOf(info.getProperty(Config.VERSION)));
}
+ if (info.containsKey(Config.AUTH_ENABLE_AUDIT)) {
+ params.setEnableAudit(Boolean.parseBoolean(info.getProperty(Config.AUTH_ENABLE_AUDIT)));
+ }
return params;
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
index 1ab1a95d04..1d4a94cf6d 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -144,4 +145,13 @@ public class UtilsTest {
Utils.parseUrl("jdbc:iotdb://127.0.0.1:6667?rpc_compress=true", properties);
assertTrue(Config.rpcThriftCompressionEnable);
}
+
+ @Test
+ public void testParseEnableAudit() throws IoTDBURLException {
+ Properties properties = new Properties();
+ properties.put("enableAudit", "false");
+ IoTDBConnectionParams ioTDBConnectionParams =
+ Utils.parseUrl("jdbc:iotdb://127.0.0.1:6667", properties);
+ assertFalse(ioTDBConnectionParams.isEnableAudit());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index c5a23bed7a..06ea17ca57 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -208,6 +208,12 @@ public class IoTDBConstant {
// compaction mods of previous version (<0.13)
public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods";
+ public static final String SYSTEM_STORAGE_GROUP = "root.__system";
+
+ public static final String CONSTANT_VERSION = "version";
+
+ public static final String AUTH_ENABLE_AUDIT = "enableAudit";
+
// client version number
public enum ClientVersion {
V_0_12,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a6989293c5..1345cf89f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -151,6 +151,7 @@ import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.SettleService;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.AuditLogUtils;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
@@ -247,8 +248,10 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
- private static final Logger AUDIT_LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+ private static boolean enableAuditLog =
+ !AuditLogUtils.LOG_LEVEL_NONE.equals(
+ IoTDBDescriptor.getInstance().getConfig().getAuditLogStorage());
+
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
// for data query
protected IQueryRouter queryRouter;
@@ -520,7 +523,9 @@ public class PlanExecutor implements IPlanExecutor {
}
// delete related data
- AUDIT_LOGGER.info("delete timeseries {}", pathToDelete);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(String.format("delete timeseries %s", pathToDelete));
+ }
DeleteTimeSeriesPlan dtsp = new DeleteTimeSeriesPlan(pathToDelete);
for (PartialPath path : pathToDelete) {
StorageEngine.getInstance()
@@ -1377,11 +1382,15 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public void delete(DeletePlan deletePlan) throws QueryProcessException {
- AUDIT_LOGGER.info(
- "delete data from {} in [{},{}]",
- deletePlan.getPaths(),
- deletePlan.getDeleteStartTime(),
- deletePlan.getDeleteEndTime());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "delete data from %s in [%s,%s]",
+ deletePlan.getPaths(),
+ deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime()));
+ }
+
for (PartialPath path : deletePlan.getPaths()) {
delete(
path,
@@ -2149,7 +2158,10 @@ public class PlanExecutor implements IPlanExecutor {
protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
throws QueryProcessException {
- AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("delete timeseries %s", deleteTimeSeriesPlan.getPaths()));
+ }
List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
for (int i = 0; i < deletePathList.size(); i++) {
PartialPath path = deletePathList.get(i);
@@ -2228,7 +2240,10 @@ public class PlanExecutor implements IPlanExecutor {
public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
throws QueryProcessException {
- AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("set storage group to %s", setStorageGroupPlan.getPaths()));
+ }
PartialPath path = setStorageGroupPlan.getPath();
try {
IoTDB.metaManager.setStorageGroup(path);
@@ -2240,7 +2255,10 @@ public class PlanExecutor implements IPlanExecutor {
protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
throws QueryProcessException {
- AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("set storage group to %s", deleteStorageGroupPlan.getPaths()));
+ }
List<PartialPath> deletePathList = new ArrayList<>();
try {
for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 294be39662..afab3218d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -140,13 +140,15 @@ public class SessionManager implements SessionManagerMBean {
IClientSession session,
String username,
String zoneId,
- IoTDBConstant.ClientVersion clientVersion) {
+ IoTDBConstant.ClientVersion clientVersion,
+ boolean enableAudit) {
session.setId(sessionIdGenerator.incrementAndGet());
session.setUsername(username);
session.setZoneId(ZoneId.of(zoneId));
session.setClientVersion(clientVersion);
session.setLogin(true);
session.setLogInTime(System.currentTimeMillis());
+ session.setEnableAudit(enableAudit);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
index 0622715787..00b86245da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -43,7 +43,7 @@ public class ClientSession extends IClientSession {
}
@Override
- int getClientPort() {
+ public int getClientPort() {
return clientSocket.getPort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
index e55e4a4781..a2d4707682 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -44,6 +44,8 @@ public abstract class IClientSession {
private long logInTime;
+ private boolean enableAudit;
+
abstract String getClientAddress();
abstract int getClientPort();
@@ -113,6 +115,14 @@ public abstract class IClientSession {
this.id = id;
}
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
+
public String toString() {
return String.format(
"%d-%s:%s:%d", getId(), getUsername(), getClientAddress(), getClientPort());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ebfd4487f9..4a7c4b3b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -61,7 +61,7 @@ public class IoTDB implements IoTDBMBean {
private final String mbeanName =
String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "IoTDB");
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final RegisterManager registerManager = new RegisterManager();
+ protected static final RegisterManager registerManager = new RegisterManager();
public static MManager metaManager = MManager.getInstance();
public static ServiceProvider serviceProvider;
public static volatile boolean activated = false;
@@ -193,7 +193,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TriggerRegistrationService.getInstance());
registerManager.register(ContinuousQueryService.getInstance());
registerManager.register(MetricService.getInstance());
-
logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 630e8d3dad..65e153d812 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -67,8 +67,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedExce
public abstract class ServiceProvider {
protected static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
- public static final Logger AUDIT_LOGGER =
- LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+
public static final Logger SLOW_SQL_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
@@ -84,6 +83,9 @@ public abstract class ServiceProvider {
public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
+ private static final boolean enableAuditLog =
+ !AuditLogUtils.LOG_LEVEL_NONE.equals(CONFIG.getAuditLogStorage());
+
private final Planner planner;
protected final IPlanExecutor executor;
@@ -173,7 +175,8 @@ public abstract class ServiceProvider {
String password,
String zoneId,
TSProtocolVersion tsProtocolVersion,
- IoTDBConstant.ClientVersion clientVersion)
+ IoTDBConstant.ClientVersion clientVersion,
+ boolean enableAudit)
throws TException {
BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
@@ -206,21 +209,26 @@ public abstract class ServiceProvider {
openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
openSessionResp.setMessage("Login successfully");
- SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion);
- LOGGER.info(
- "{}: Login status: {}. User : {}, opens Session-{}",
- IoTDBConstant.GLOBAL_DB_NAME,
- openSessionResp.getMessage(),
- username,
- session);
+ SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion, enableAudit);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "%s: Login status: %s. User : %s, opens Session-%s",
+ IoTDBConstant.GLOBAL_DB_NAME, openSessionResp.getMessage(), username, session));
+ }
+
} else {
openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
- AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+ session.setUsername(username);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("User %s opens Session failed with an incorrect password", username),
+ true);
+ }
// TODO we should close this connection ASAP, otherwise there will be DDoS.
}
SessionTimeoutManager.getInstance().register(session);
- AuditLogUtils.writeAuditLog(AuditLogUtils.TYPE_LOGIN, "user login");
return openSessionResp.sessionId(session == null ? -1 : session.getId());
}
@@ -232,12 +240,19 @@ public abstract class ServiceProvider {
TSProtocolVersion tsProtocolVersion)
throws TException {
return login(
- session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+ session,
+ username,
+ password,
+ zoneId,
+ tsProtocolVersion,
+ IoTDBConstant.ClientVersion.V_0_12,
+ false);
}
public boolean closeSession(IClientSession session) {
- AUDIT_LOGGER.info("Session-{} is closing", session);
- AuditLogUtils.writeAuditLog(AuditLogUtils.TYPE_LOGOUT, "user logout");
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(String.format("Session-%s is closing", session));
+ }
return SessionTimeoutManager.getInstance().unregister(session);
}
@@ -255,9 +270,11 @@ public abstract class ServiceProvider {
if (checkSessionTimeout(session)) {
return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, session);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "%s: receive close operation from Session %s",
+ IoTDBConstant.GLOBAL_DB_NAME, session));
}
try {
if (haveStatementId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
index 8ad1f16e60..d9cc7cfdc5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.AuditLogUtils;
public class StandaloneServiceProvider extends ServiceProvider {
@@ -51,10 +50,6 @@ public class StandaloneServiceProvider extends ServiceProvider {
&& IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineReadonlyException();
}
- AuditLogUtils.writeAuditLog(
- plan.getOperatorName(),
- String.format(
- "measurements size:%s", plan.getPaths() == null ? 0 : plan.getPaths().size()));
return executor.processNonQuery(plan);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3f98f7a06d..41ee38f4de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -159,7 +159,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.AUTH_ENABLE_AUDIT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.CONSTANT_VERSION;
+import static org.apache.iotdb.db.conf.IoTDBConstant.SYSTEM_STORAGE_GROUP;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
@@ -178,6 +180,11 @@ public class TSServiceImpl implements TSIService.Iface {
private static final boolean isEnableOperationSync =
IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+ private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final boolean enableAuditLog =
+ !AuditLogUtils.LOG_LEVEL_NONE.equals(conf.getAuditLogStorage());
+
protected class QueryTask implements Callable<TSExecuteStatementResp> {
private PhysicalPlan plan;
@@ -222,8 +229,10 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setLoginUserName(username);
QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
-
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("Session %s execute Query: %s", session, statement));
+ }
final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
QueryContext context =
serviceProvider.genQueryContext(
@@ -324,6 +333,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
+ boolean enableAudit = parseEnableAudit(req);
BasicOpenSessionResp openSessionResp =
serviceProvider.login(
SESSION_MANAGER.getCurrSession(),
@@ -331,7 +341,8 @@ public class TSServiceImpl implements TSIService.Iface {
req.password,
req.zoneId,
req.client_protocol,
- clientVersion);
+ clientVersion,
+ enableAudit);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
@@ -339,12 +350,20 @@ public class TSServiceImpl implements TSIService.Iface {
private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) {
Map<String, String> configuration = req.configuration;
- if (configuration != null && configuration.containsKey("version")) {
- return IoTDBConstant.ClientVersion.valueOf(configuration.get("version"));
+ if (configuration != null && configuration.containsKey(CONSTANT_VERSION)) {
+ return IoTDBConstant.ClientVersion.valueOf(configuration.get(CONSTANT_VERSION));
}
return IoTDBConstant.ClientVersion.V_0_12;
}
+ private boolean parseEnableAudit(TSOpenSessionReq req) {
+ Map<String, String> configuration = req.configuration;
+ if (configuration != null && configuration.containsKey(AUTH_ENABLE_AUDIT)) {
+ return Boolean.parseBoolean(configuration.get(AUTH_ENABLE_AUDIT));
+ }
+ return true;
+ }
+
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
return new TSStatus(
@@ -633,6 +652,9 @@ public class TSServiceImpl implements TSIService.Iface {
serviceProvider
.getPlanner()
.parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(req.getStatement());
+ }
if (physicalPlan.isQuery()) {
return submitQueryTask(session, physicalPlan, startTime, req);
@@ -665,7 +687,9 @@ public class TSServiceImpl implements TSIService.Iface {
serviceProvider
.getPlanner()
.parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
-
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(statement);
+ }
if (physicalPlan.isQuery()) {
return submitQueryTask(session, physicalPlan, startTime, req);
} else {
@@ -780,7 +804,6 @@ public class TSServiceImpl implements TSIService.Iface {
if (status != null) {
return new TSExecuteStatementResp(status);
}
- AuditLogUtils.writeAuditLog(AuditLogUtils.TYPE_QUERY, req.getStatement());
QueryTask queryTask =
new QueryTask(
physicalPlan,
@@ -948,7 +971,11 @@ public class TSServiceImpl implements TSIService.Iface {
final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
QUERY_FREQUENCY_RECORDER.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute select into: {}", session, statement);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("Session %s execute select into: %s", session, statement));
+ }
+
if (queryPlan.isEnableTracing()) {
TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
}
@@ -1199,13 +1226,14 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- session,
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
+
+ if (conf.isEnableAuditLogWrite() && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecords, first device %s, first time %s",
+ session, req.prefixPaths.get(0), req.getTimestamps().get(0)));
}
+
boolean allCheckSuccess = true;
InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
for (int i = 0; i < req.prefixPaths.size(); i++) {
@@ -1287,12 +1315,12 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- session,
- req.prefixPath,
- req.getTimestamps().get(0));
+
+ if (conf.isEnableAuditLogWrite() && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecords, device %s, first time %s",
+ session, req.prefixPath, req.getTimestamps().get(0)));
}
List<TSStatus> statusList = new ArrayList<>();
@@ -1334,12 +1362,12 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, device {}, first time {}",
- session,
- req.prefixPath,
- req.getTimestamps().get(0));
+
+ if (conf.isEnableAuditLogWrite() && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecords, device %s, first time %s",
+ session, req.prefixPath, req.getTimestamps().get(0)));
}
boolean allCheckSuccess = true;
@@ -1393,12 +1421,12 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session {} insertRecords, first device {}, first time {}",
- session,
- req.prefixPaths.get(0),
- req.getTimestamps().get(0));
+
+ if (conf.isEnableAuditLogWrite() && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecords, first device %s, first time %s",
+ session, req.prefixPaths.get(0), req.getTimestamps().get(0)));
}
boolean allCheckSuccess = true;
@@ -1508,11 +1536,14 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- session,
- req.getPrefixPath(),
- req.getTimestamp());
+ if (!req.getPrefixPath().startsWith(SYSTEM_STORAGE_GROUP)
+ && conf.isEnableAuditLogWrite()
+ && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecord, device %s, time %s",
+ session, req.getPrefixPath(), req.getTimestamp()));
+ }
InsertRowPlan plan =
new InsertRowPlan(
@@ -1544,11 +1575,12 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- AUDIT_LOGGER.debug(
- "Session {} insertRecord, device {}, time {}",
- session,
- req.getPrefixPath(),
- req.getTimestamp());
+ if (conf.isEnableAuditLogWrite() && enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session %s insertRecord, device %s, time %s",
+ session, req.getPrefixPath(), req.getTimestamp()));
+ }
InsertRowPlan plan = new InsertRowPlan();
plan.setDevicePath(new PartialPath(req.getPrefixPath()));
@@ -1754,8 +1786,9 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug("Session-{} create timeseries {}", session, req.getPath());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("Session-%s create timeseries %s", session, req.getPath()));
}
CreateTimeSeriesPlan plan =
@@ -1786,13 +1819,11 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} create aligned timeseries {}.{}",
- session,
- req.getPrefixPath(),
- req.getMeasurements());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s create aligned timeseries %s.%s",
+ session, req.getPrefixPath(), req.getMeasurements()));
}
List<TSDataType> dataTypes = new ArrayList<>();
@@ -1835,12 +1866,11 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} create {} timeseries, the first is {}",
- session,
- req.getPaths().size(),
- req.getPaths().get(0));
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s create %s timeseries, the first is %s",
+ session, req.getPaths().size(), req.getPaths().get(0)));
}
CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
@@ -1953,8 +1983,9 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug("Session-{} create schema template {}", session, req.getName());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("Session-%s create schema template %s", session, req.getName()));
}
CreateTemplatePlan plan;
@@ -2058,12 +2089,11 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} set device template {}.{}",
- session,
- req.getTemplateName(),
- req.getPrefixPath());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s set device template %s.%s",
+ session, req.getTemplateName(), req.getPrefixPath()));
}
try {
@@ -2082,14 +2112,12 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} unset schema template {}.{}",
- session,
- req.getPrefixPath(),
- req.getTemplateName());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s unset schema template %s.%s",
+ session, req.getPrefixPath(), req.getTemplateName()));
}
-
try {
UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
TSStatus status = serviceProvider.checkAuthority(plan, session);
@@ -2107,10 +2135,11 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
-
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} unset using schema template {} on {}", session, templateName, prefixPath);
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s unset using schema template %s on %s",
+ session, templateName, prefixPath));
}
try {
@@ -2130,9 +2159,11 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug(
- "Session-{} create timeseries of schema template on path {}", session, req.getDstPath());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format(
+ "Session-%s create timeseries of schema template on path %s",
+ session, req.getDstPath()));
}
try {
@@ -2151,8 +2182,9 @@ public class TSServiceImpl implements TSIService.Iface {
if (isStatusNotSuccess(loginStatus)) {
return loginStatus;
}
- if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug("Session-{} drop schema template {}.", session, req.getTemplateName());
+ if (enableAuditLog) {
+ AuditLogUtils.writeAuditLog(
+ String.format("Session-%s drop schema template %s.", session, req.getTemplateName()));
}
DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
@@ -2170,7 +2202,6 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
- IClientSession session = SESSION_MANAGER.getCurrSession();
PhysicalPlan physicalPlan;
try {
ByteBuffer planBuffer = req.physicalPlan;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AuditLogUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/AuditLogUtils.java
index 27fd8639de..78002f6c42 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/AuditLogUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AuditLogUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.IoTDB;
@@ -39,45 +40,44 @@ public class AuditLogUtils {
LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
public static final String LOG = "log";
- public static final String TYPE = "type";
public static final String USERNAME = "username";
+ public static final String ADDRESS = "address";
public static final String AUDIT_LOG_DEVICE = "root.__system.audit.'%s'";
- public static final String TYPE_QUERY = "QUERY";
- public static final String TYPE_LOGIN = "LOGIN";
- public static final String TYPE_LOGOUT = "LOGOUT";
- public static final String TYPE_INSERT = "INSERT";
- public static final String TYPE_DELETE = "DELETE";
public static final String LOG_LEVEL_IOTDB = "IOTDB";
public static final String LOG_LEVEL_LOGGER = "LOGGER";
public static final String LOG_LEVEL_NONE = "NONE";
- public static void writeAuditLog(String type, String log) {
+ public static void writeAuditLog(String log) {
+ writeAuditLog(log, false);
+ }
+
+ public static void writeAuditLog(String log, boolean enableWrite) {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
String auditLogStorage = config.getAuditLogStorage();
IClientSession currSession = SessionManager.getInstance().getCurrSession();
if (currSession == null) {
return;
}
+ ClientSession clientSession = (ClientSession) currSession;
+ String clientAddress = clientSession.getClientAddress();
+ int clientPort = ((ClientSession) currSession).getClientPort();
+ String address = String.format("%s:%s", clientAddress, clientPort);
String username = currSession.getUsername();
- if (LOG_LEVEL_IOTDB.equals(auditLogStorage)) {
- try {
- InsertRowPlan insertRowPlan =
- new InsertRowPlan(
- new PartialPath(String.format(AUDIT_LOG_DEVICE, username)),
- DateTimeUtils.currentTime(),
- new String[] {TYPE, LOG, USERNAME},
- new String[] {type, log, username});
- IoTDB.serviceProvider.getExecutor().insert(insertRowPlan);
- } catch (IllegalPathException | QueryProcessException e) {
- logger.error("write audit log series error,", e);
- }
- } else if (LOG_LEVEL_LOGGER.equals(auditLogStorage)) {
- if (type.contains(TYPE_INSERT)) {
- if (config.isEnableAuditLogWrite()) {
- AUDIT_LOGGER.debug("user:{},type:{},action:{}", username, type, log);
+ if (clientSession.isEnableAudit() || enableWrite) {
+ if (LOG_LEVEL_IOTDB.equals(auditLogStorage)) {
+ try {
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath(String.format(AUDIT_LOG_DEVICE, username)),
+ DateTimeUtils.currentTime(),
+ new String[] {LOG, USERNAME, ADDRESS},
+ new String[] {log, username, address});
+ IoTDB.serviceProvider.getExecutor().insert(insertRowPlan);
+ } catch (IllegalPathException | QueryProcessException e) {
+ logger.error("write audit log series error,", e);
}
- } else {
- AUDIT_LOGGER.info("user:{},type:{},action:{}", username, type, log);
+ } else if (LOG_LEVEL_LOGGER.equals(auditLogStorage)) {
+ AUDIT_LOGGER.info("user:{},address:{},log:{}", username, address, log);
}
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index 6304fb5a5a..49f8147c0f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -46,4 +46,6 @@ public class Config {
public static final int DEFAULT_SESSION_POOL_MAX_SIZE = 5;
public static final Version DEFAULT_VERSION = Version.V_0_13;
+
+ public static final boolean DEFAULT_ENABLE_AUDIT = true;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 964eef770e..8d57a1977d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -142,6 +142,8 @@ public class Session {
// The version number of the client which used for compatibility in the server
protected Version version;
+ protected boolean enableAudit = Config.DEFAULT_ENABLE_AUDIT;
+
public Session(String host, int rpcPort) {
this(
host,
@@ -248,6 +250,27 @@ public class Session {
Config.DEFAULT_VERSION);
}
+ public Session(
+ String host,
+ int rpcPort,
+ String username,
+ String password,
+ boolean enableCacheLeader,
+ boolean enableAudit) {
+ this(
+ host,
+ rpcPort,
+ username,
+ password,
+ Config.DEFAULT_FETCH_SIZE,
+ null,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
+ Config.DEFAULT_MAX_FRAME_SIZE,
+ enableCacheLeader,
+ enableAudit,
+ Config.DEFAULT_VERSION);
+ }
+
public Session(
String host,
int rpcPort,
@@ -292,6 +315,30 @@ public class Session {
this.version = version;
}
+ public Session(
+ String host,
+ int rpcPort,
+ String username,
+ String password,
+ int fetchSize,
+ ZoneId zoneId,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ boolean enableCacheLeader,
+ boolean enableAudit,
+ Version version) {
+ this.defaultEndPoint = new EndPoint(host, rpcPort);
+ this.username = username;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ this.zoneId = zoneId;
+ this.thriftDefaultBufferSize = thriftDefaultBufferSize;
+ this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.enableCacheLeader = enableCacheLeader;
+ this.enableAudit = enableAudit;
+ this.version = version;
+ }
+
public Session(List<String> nodeUrls, String username, String password) {
this(
nodeUrls,
@@ -346,6 +393,30 @@ public class Session {
int thriftMaxFrameSize,
boolean enableCacheLeader,
Version version) {
+ this(
+ nodeUrls,
+ username,
+ password,
+ fetchSize,
+ zoneId,
+ thriftDefaultBufferSize,
+ thriftMaxFrameSize,
+ enableCacheLeader,
+ version,
+ true);
+ }
+
+ public Session(
+ List<String> nodeUrls,
+ String username,
+ String password,
+ int fetchSize,
+ ZoneId zoneId,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ boolean enableCacheLeader,
+ Version version,
+ boolean enableAudit) {
this.nodeUrls = nodeUrls;
this.username = username;
this.password = password;
@@ -355,6 +426,7 @@ public class Session {
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.enableCacheLeader = enableCacheLeader;
this.version = version;
+ this.enableAudit = enableAudit;
}
public void setFetchSize(int fetchSize) {
@@ -373,6 +445,10 @@ public class Session {
this.version = version;
}
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
+
public synchronized void open() throws IoTDBConnectionException {
open(false, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -3129,6 +3205,8 @@ public class Session {
private List<String> nodeUrls = null;
+ private boolean enableAudit = true;
+
public Builder host(String host) {
this.host = host;
return this;
@@ -3184,6 +3262,11 @@ public class Session {
return this;
}
+ public Builder enableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ return this;
+ }
+
public Session build() {
if (nodeUrls != null
&& (!Config.DEFAULT_HOST.equals(host) || rpcPort != Config.DEFAULT_PORT)) {
@@ -3202,7 +3285,8 @@ public class Session {
thriftDefaultBufferSize,
thriftMaxFrameSize,
enableCacheLeader,
- version);
+ version,
+ enableAudit);
newSession.setEnableQueryRedirection(true);
return newSession;
}
@@ -3217,6 +3301,7 @@ public class Session {
thriftDefaultBufferSize,
thriftMaxFrameSize,
enableCacheLeader,
+ enableAudit,
version);
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 1f0c6b6919..99fc006223 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -92,6 +92,8 @@ public class SessionConnection {
private EndPoint endPoint;
private List<EndPoint> endPointList = new ArrayList<>();
private boolean enableRedirect = false;
+ public static final String VERSION = "version";
+ public static final String AUTH_ENABLE_AUDIT = "enableAudit";
// TestOnly
public SessionConnection() {}
@@ -140,8 +142,8 @@ public class SessionConnection {
openReq.setUsername(session.username);
openReq.setPassword(session.password);
openReq.setZoneId(zoneId.toString());
- openReq.putToConfiguration("version", session.version.toString());
-
+ openReq.putToConfiguration(VERSION, session.version.toString());
+ openReq.putToConfiguration(AUTH_ENABLE_AUDIT, String.valueOf(session.enableAudit));
try {
TSOpenSessionResp openResp = client.openSession(openReq);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 711f7f66df..57522d79f5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -100,6 +100,8 @@ public class SessionPool {
// Redirect-able SessionPool
private final List<String> nodeUrls;
+ private final boolean enableAudit;
+
public SessionPool(String host, int port, String user, String password, int maxSize) {
this(
host,
@@ -145,6 +147,23 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ String host, int port, String user, String password, boolean enableAudit, int maxSize) {
+ this(
+ host,
+ port,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS,
+ enableAudit);
+ }
+
public SessionPool(
List<String> nodeUrls, String user, String password, int maxSize, boolean enableCompression) {
this(
@@ -160,6 +179,22 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls, String user, String password, boolean enableAudit, int maxSize) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS,
+ enableAudit);
+ }
+
public SessionPool(
String host,
int port,
@@ -246,6 +281,35 @@ public class SessionPool {
ZoneId zoneId,
boolean enableCacheLeader,
int connectionTimeoutInMs) {
+ this(
+ host,
+ port,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs,
+ true);
+ }
+
+ @SuppressWarnings("squid:S107")
+ public SessionPool(
+ String host,
+ int port,
+ String user,
+ String password,
+ int maxSize,
+ int fetchSize,
+ long waitToGetSessionTimeoutInMs,
+ boolean enableCompression,
+ ZoneId zoneId,
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs,
+ boolean enableAudit) {
this.maxSize = maxSize;
this.host = host;
this.port = port;
@@ -258,6 +322,7 @@ public class SessionPool {
this.zoneId = zoneId;
this.enableCacheLeader = enableCacheLeader;
this.connectionTimeoutInMs = connectionTimeoutInMs;
+ this.enableAudit = enableAudit;
}
public SessionPool(
@@ -271,6 +336,32 @@ public class SessionPool {
ZoneId zoneId,
boolean enableCacheLeader,
int connectionTimeoutInMs) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs,
+ true);
+ }
+
+ public SessionPool(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ int maxSize,
+ int fetchSize,
+ long waitToGetSessionTimeoutInMs,
+ boolean enableCompression,
+ ZoneId zoneId,
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs,
+ boolean enableAudit) {
this.maxSize = maxSize;
this.host = null;
this.port = -1;
@@ -283,6 +374,7 @@ public class SessionPool {
this.zoneId = zoneId;
this.enableCacheLeader = enableCacheLeader;
this.connectionTimeoutInMs = connectionTimeoutInMs;
+ this.enableAudit = enableAudit;
}
private Session constructNewSession() {
@@ -390,6 +482,7 @@ public class SessionPool {
session = constructNewSession();
try {
+ session.setEnableAudit(enableAudit);
session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
synchronized (this) {
@@ -2440,6 +2533,7 @@ public class SessionPool {
private ZoneId zoneId = null;
private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
private int connectionTimeoutInMs = Config.DEFAULT_CONNECTION_TIMEOUT_MS;
+ private boolean enableAudit = true;
public Builder host(String host) {
this.host = host;
@@ -2501,6 +2595,11 @@ public class SessionPool {
return this;
}
+ public Builder enableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ return this;
+ }
+
public SessionPool build() {
if (nodeUrls == null) {
return new SessionPool(
@@ -2514,7 +2613,8 @@ public class SessionPool {
enableCompression,
zoneId,
enableCacheLeader,
- connectionTimeoutInMs);
+ connectionTimeoutInMs,
+ enableAudit);
} else {
return new SessionPool(
nodeUrls,
@@ -2526,7 +2626,8 @@ public class SessionPool {
enableCompression,
zoneId,
enableCacheLeader,
- connectionTimeoutInMs);
+ connectionTimeoutInMs,
+ enableAudit);
}
}
}