You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/02 02:33:33 UTC
[iotdb] branch master updated: [IOTDB-5601] [Refactor] Remove AsyncConfigNodeHeartbeatServiceClient and AsyncDataNodeHeartbeatServiceClient as there core logic are duplicated (#9180)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4456a3995a [IOTDB-5601] [Refactor] Remove AsyncConfigNodeHeartbeatServiceClient and AsyncDataNodeHeartbeatServiceClient as there core logic are duplicated (#9180)
4456a3995a is described below
commit 4456a3995a1f042246d5c6a5069831b3b5c66765
Author: Potato <ta...@apache.org>
AuthorDate: Thu Mar 2 10:33:26 2023 +0800
[IOTDB-5601] [Refactor] Remove AsyncConfigNodeHeartbeatServiceClient and AsyncDataNodeHeartbeatServiceClient as there core logic are duplicated (#9180)
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../async/AsyncConfigNodeHeartbeatClientPool.java | 6 +-
.../async/AsyncDataNodeHeartbeatClientPool.java | 6 +-
.../iotdb/consensus/config/IoTConsensusConfig.java | 18 +++
.../iot/client/AsyncIoTConsensusServiceClient.java | 19 ++-
.../iot/client/IoTConsensusClientPool.java | 4 +
.../iot/client/SyncIoTConsensusServiceClient.java | 32 +++--
.../iotdb/commons/client/ClientPoolFactory.java | 24 ++--
.../iotdb/commons/client/IClientManager.java | 8 +-
.../iotdb/commons/client/IClientPoolFactory.java | 3 +
.../apache/iotdb/commons/client/ThriftClient.java | 29 ++++-
.../AsyncConfigNodeHeartbeatServiceClient.java | 144 ---------------------
.../async/AsyncConfigNodeIServiceClient.java | 23 ++--
.../async/AsyncDataNodeHeartbeatServiceClient.java | 144 ---------------------
.../async/AsyncDataNodeInternalServiceClient.java | 24 ++--
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 23 ++--
.../client/property/ThriftClientProperty.java | 32 ++++-
.../client/sync/SyncConfigNodeIServiceClient.java | 32 +++--
.../sync/SyncDataNodeInternalServiceClient.java | 32 +++--
.../SyncDataNodeMPPDataExchangeServiceClient.java | 32 +++--
.../apache/iotdb/db/client/ConfigNodeClient.java | 25 ++--
20 files changed, 249 insertions(+), 411 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 3c2e3072f3..e87572fd9b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -21,16 +21,16 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
public class AsyncConfigNodeHeartbeatClientPool {
- private final IClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager;
+ private final IClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager;
private AsyncConfigNodeHeartbeatClientPool() {
clientManager =
- new IClientManager.Factory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient>()
+ new IClientManager.Factory<TEndPoint, AsyncConfigNodeIServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index 1a92dd6ac9..3c2e497674 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -21,18 +21,18 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
public class AsyncDataNodeHeartbeatClientPool {
- private final IClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager;
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
private AsyncDataNodeHeartbeatClientPool() {
clientManager =
- new IClientManager.Factory<TEndPoint, AsyncDataNodeHeartbeatServiceClient>()
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fdaa9..ff2d2d4864 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -77,6 +77,8 @@ public class IoTConsensusConfig {
private final boolean isRpcThriftCompressionEnabled;
private final int selectorNumOfClientManager;
private final int connectionTimeoutInMs;
+
+ private final boolean printLogWhenThriftClientEncounterException;
private final int thriftMaxFrameSize;
private final int coreClientNumForEachNode;
private final int maxClientNumForEachNode;
@@ -89,6 +91,7 @@ public class IoTConsensusConfig {
boolean isRpcThriftCompressionEnabled,
int selectorNumOfClientManager,
int connectionTimeoutInMs,
+ boolean printLogWhenThriftClientEncounterException,
int thriftMaxFrameSize,
int coreClientNumForEachNode,
int maxClientNumForEachNode) {
@@ -99,6 +102,7 @@ public class IoTConsensusConfig {
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
this.selectorNumOfClientManager = selectorNumOfClientManager;
this.connectionTimeoutInMs = connectionTimeoutInMs;
+ this.printLogWhenThriftClientEncounterException = printLogWhenThriftClientEncounterException;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.coreClientNumForEachNode = coreClientNumForEachNode;
this.maxClientNumForEachNode = maxClientNumForEachNode;
@@ -132,6 +136,10 @@ public class IoTConsensusConfig {
return connectionTimeoutInMs;
}
+ public boolean isPrintLogWhenThriftClientEncounterException() {
+ return printLogWhenThriftClientEncounterException;
+ }
+
public int getThriftMaxFrameSize() {
return thriftMaxFrameSize;
}
@@ -157,6 +165,8 @@ public class IoTConsensusConfig {
private boolean isRpcThriftCompressionEnabled = false;
private int selectorNumOfClientManager = 1;
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+
+ private boolean printLogWhenThriftClientEncounterException = true;
private int thriftMaxFrameSize = 536870912;
private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
@@ -199,6 +209,13 @@ public class IoTConsensusConfig {
return this;
}
+ public Builder setPrintLogWhenThriftClientEncounterException(
+ boolean printLogWhenThriftClientEncounterException) {
+ this.printLogWhenThriftClientEncounterException =
+ printLogWhenThriftClientEncounterException;
+ return this;
+ }
+
public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
return this;
@@ -223,6 +240,7 @@ public class IoTConsensusConfig {
isRpcThriftCompressionEnabled,
selectorNumOfClientManager,
connectionTimeoutInMs,
+ printLogWhenThriftClientEncounterException,
thriftMaxFrameSize,
coreClientNumForEachNode,
maxClientNumForEachNode);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index f84b583924..126b832db8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,20 +41,22 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
private static final Logger logger =
LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
public AsyncIoTConsensusServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
TAsyncClientManager tClientManager,
ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
throws IOException {
super(
- protocolFactory,
+ property.getProtocolFactory(),
tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+ TNonblockingSocketWrapper.wrap(
+ endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs()));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
}
@@ -85,6 +86,11 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
/**
* return self, the method doesn't need to be called by the user and will be triggered after the
* RPC is finished.
@@ -134,8 +140,7 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
throws Exception {
return new DefaultPooledObject<>(
new AsyncIoTConsensusServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty,
endPoint,
tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 92b2cc28cf..a3eeae4699 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -54,6 +54,8 @@ public class IoTConsensusClientPool {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
+ .setPrintLogWhenEncounterException(
+ config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
@@ -83,6 +85,8 @@ public class IoTConsensusClientPool {
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
+ .setPrintLogWhenEncounterException(
+ config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build(),
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index 638893b116..e3815d3c3e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -31,30 +31,32 @@ import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
implements ThriftClient, AutoCloseable {
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager;
public SyncIoTConsensusServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager)
throws TTransportException {
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
- connectionTimeout))));
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endpoint.getIp(),
+ endpoint.getPort(),
+ property.getConnectionTimeoutMs()))));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
@@ -75,6 +77,11 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
@Override
public String toString() {
return String.format("SyncIoTConsensusServiceClient{%s}", endpoint);
@@ -102,9 +109,8 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncIoTConsensusServiceClient.class,
SyncIoTConsensusServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty,
endpoint,
clientManager));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b30620f8be..b156ec4785 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
@@ -132,21 +130,22 @@ public class ClientPoolFactory {
}
public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+ implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> manager) {
+ public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
return new GenericKeyedObjectPool<>(
- new AsyncConfigNodeHeartbeatServiceClient.Factory(
+ new AsyncConfigNodeIServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
+ new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
@@ -155,21 +154,22 @@ public class ClientPoolFactory {
}
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+ implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeHeartbeatServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> manager) {
+ public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
- new AsyncDataNodeHeartbeatServiceClient.Factory(
+ new AsyncDataNodeInternalServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
+ new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index aa3a09c837..81344e4671 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -30,12 +30,18 @@ public interface IClientManager<K, V> {
/**
* get a client V for node K from the IClientManager.
*
+ * @param node target node
+ * @return client
* @throws BorrowNullClientManagerException if node is null
* @throws ClientManagerException for other exceptions
*/
V borrowClient(K node) throws ClientManagerException;
- /** clear all clients for node K. */
+ /**
+ * clear all clients for node K.
+ *
+ * @param node target node
+ */
void clear(K node);
/** close IClientManager, which means closing all clients for all nodes. */
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 5f2c7a72f6..5382cf7190 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -26,6 +26,9 @@ public interface IClientPoolFactory<K, V> {
/**
* We can implement this interface in other modules and then set the corresponding expected
* parameters and client factory classes.
+ *
+ * @param manager the reference to the clientManager
+ * @return A concurrency safe object pool
*/
KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index b8124f939f..e9d3b9c36c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -42,6 +42,19 @@ public interface ThriftClient {
/** Removing all pooled instances corresponding to current instance's endpoint. */
void invalidateAll();
+ /**
+ * Whether to print logs when exceptions are encountered.
+ *
+ * @return result
+ */
+ boolean printLogWhenEncounterException();
+
+ /**
+ * Perform corresponding operations on ThriftClient o based on the Throwable t.
+ *
+ * @param t Throwable
+ * @param o ThriftClient
+ */
static void resolveException(Throwable t, ThriftClient o) {
Throwable origin = t;
if (t instanceof InvocationTargetException) {
@@ -72,15 +85,23 @@ public interface ThriftClient {
rootCause.getLocalizedMessage(),
rootCause);
if (isConnectionBroken(rootCause)) {
- logger.debug(
- "Broken pipe error happened in sending RPC,"
- + " we need to clear all previous cached connection",
- t);
+ if (o.printLogWhenEncounterException()) {
+ logger.info(
+ "Broken pipe error happened in sending RPC,"
+ + " we need to clear all previous cached connection",
+ t);
+ }
o.invalidateAll();
}
}
}
+ /**
+ * Determine whether the target node has gone offline once based on the cause.
+ *
+ * @param cause Throwable
+ * @return true/false
+ */
static boolean isConnectionBroken(Throwable cause) {
return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
|| (cause instanceof TTransportException
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
deleted file mode 100644
index 86de8b6f02..0000000000
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.client.async;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ThriftClient;
-import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-
-public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient
- implements ThriftClient {
-
- private final TEndPoint endpoint;
- private final ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager;
-
- public AsyncConfigNodeHeartbeatServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
- TEndPoint endpoint,
- TAsyncClientManager tClientManager,
- ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager)
- throws IOException {
- super(
- protocolFactory,
- tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
- this.endpoint = endpoint;
- this.clientManager = clientManager;
- }
-
- @Override
- public void onComplete() {
- super.onComplete();
- returnSelf();
- }
-
- @Override
- public void onError(Exception e) {
- super.onError(e);
- ThriftClient.resolveException(e, this);
- returnSelf();
- }
-
- @Override
- public void invalidate() {
- if (!hasError()) {
- super.onError(new Exception("This client has been invalidated"));
- }
- }
-
- @Override
- public void invalidateAll() {
- clientManager.clear(endpoint);
- }
-
- /**
- * return self, the method doesn't need to be called by the user and will be triggered after the
- * RPC is finished.
- */
- private void returnSelf() {
- clientManager.returnClient(endpoint, this);
- }
-
- private void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- public boolean isReady() {
- try {
- checkReady();
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return String.format("AsyncConfigNodeHeartbeatServiceClient{%s}", endpoint);
- }
-
- public static class Factory
- extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
-
- public Factory(
- ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager,
- ThriftClientProperty thriftClientProperty,
- String threadName) {
- super(clientManager, thriftClientProperty, threadName);
- }
-
- @Override
- public void destroyObject(
- TEndPoint endPoint, PooledObject<AsyncConfigNodeHeartbeatServiceClient> pooledObject) {
- pooledObject.getObject().close();
- }
-
- @Override
- public PooledObject<AsyncConfigNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
- throws Exception {
- return new DefaultPooledObject<>(
- new AsyncConfigNodeHeartbeatServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
- endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
- clientManager));
- }
-
- @Override
- public boolean validateObject(
- TEndPoint endPoint, PooledObject<AsyncConfigNodeHeartbeatServiceClient> pooledObject) {
- return pooledObject.getObject().isReady();
- }
- }
-}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 91d64ab257..134c71cda1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,20 +40,22 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
private static final Logger logger = LoggerFactory.getLogger(AsyncConfigNodeIServiceClient.class);
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager;
public AsyncConfigNodeIServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
TAsyncClientManager tClientManager,
ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager)
throws IOException {
super(
- protocolFactory,
+ property.getProtocolFactory(),
tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+ TNonblockingSocketWrapper.wrap(
+ endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs()));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
}
@@ -84,6 +85,11 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
/**
* return self, the method doesn't need to be called by the user and will be triggered after the
* RPC is finished.
@@ -102,7 +108,9 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
checkReady();
return true;
} catch (Exception e) {
- logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ if (printLogWhenEncounterException) {
+ logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ }
return false;
}
}
@@ -133,8 +141,7 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
throws Exception {
return new DefaultPooledObject<>(
new AsyncConfigNodeIServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty,
endPoint,
tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
deleted file mode 100644
index 50c0540fd0..0000000000
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.client.async;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ThriftClient;
-import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-
-public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient
- implements ThriftClient {
-
- private final TEndPoint endpoint;
- private final ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager;
-
- public AsyncDataNodeHeartbeatServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
- TEndPoint endpoint,
- TAsyncClientManager tClientManager,
- ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager)
- throws IOException {
- super(
- protocolFactory,
- tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
- this.endpoint = endpoint;
- this.clientManager = clientManager;
- }
-
- @Override
- public void onComplete() {
- super.onComplete();
- returnSelf();
- }
-
- @Override
- public void onError(Exception e) {
- super.onError(e);
- ThriftClient.resolveException(e, this);
- returnSelf();
- }
-
- @Override
- public void invalidate() {
- if (!hasError()) {
- super.onError(new Exception("This client has been invalidated"));
- }
- }
-
- @Override
- public void invalidateAll() {
- clientManager.clear(endpoint);
- }
-
- /**
- * return self, the method doesn't need to be called by the user and will be triggered after the
- * RPC is finished.
- */
- private void returnSelf() {
- clientManager.returnClient(endpoint, this);
- }
-
- private void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- public boolean isReady() {
- try {
- checkReady();
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return String.format("AsyncDataNodeHeartbeatServiceClient{%s}", endpoint);
- }
-
- public static class Factory
- extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
-
- public Factory(
- ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager,
- ThriftClientProperty thriftClientProperty,
- String threadName) {
- super(clientManager, thriftClientProperty, threadName);
- }
-
- @Override
- public void destroyObject(
- TEndPoint endPoint, PooledObject<AsyncDataNodeHeartbeatServiceClient> pooledObject) {
- pooledObject.getObject().close();
- }
-
- @Override
- public PooledObject<AsyncDataNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
- throws Exception {
- return new DefaultPooledObject<>(
- new AsyncDataNodeHeartbeatServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
- endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
- clientManager));
- }
-
- @Override
- public boolean validateObject(
- TEndPoint endPoint, PooledObject<AsyncDataNodeHeartbeatServiceClient> pooledObject) {
- return pooledObject.getObject().isReady();
- }
- }
-}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 46c6eec275..af21ff5bfd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,20 +42,23 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
private static final Logger logger =
LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
+ private final boolean printLogWhenEncounterException;
+
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
public AsyncDataNodeInternalServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
TAsyncClientManager tClientManager,
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager)
throws IOException {
super(
- protocolFactory,
+ property.getProtocolFactory(),
tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+ TNonblockingSocketWrapper.wrap(
+ endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs()));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
}
@@ -96,6 +98,11 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
/**
* return self, the method doesn't need to be called by the user and will be triggered after the
* RPC is finished.
@@ -114,7 +121,9 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
checkReady();
return true;
} catch (Exception e) {
- logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ if (printLogWhenEncounterException) {
+ logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ }
return false;
}
}
@@ -145,8 +154,7 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
throws Exception {
return new DefaultPooledObject<>(
new AsyncDataNodeInternalServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty,
endPoint,
tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 7b056fc350..57d16c47b5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,20 +41,22 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
private static final Logger logger =
LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class);
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientManager;
public AsyncDataNodeMPPDataExchangeServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
TAsyncClientManager tClientManager,
ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientManager)
throws IOException {
super(
- protocolFactory,
+ property.getProtocolFactory(),
tClientManager,
- TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+ TNonblockingSocketWrapper.wrap(
+ endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs()));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
}
@@ -85,6 +86,11 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
/**
* return self, the method doesn't need to be called by the user and will be triggered after the
* RPC is finished.
@@ -103,7 +109,9 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
checkReady();
return true;
} catch (Exception e) {
- logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ if (printLogWhenEncounterException) {
+ logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
+ }
return false;
}
}
@@ -134,8 +142,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
throws Exception {
return new DefaultPooledObject<>(
new AsyncDataNodeMPPDataExchangeServiceClient(
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty,
endPoint,
tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index c54c23fbd7..29b2f12250 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -25,17 +25,23 @@ import org.apache.thrift.protocol.TProtocolFactory;
import java.util.concurrent.TimeUnit;
+/** This class defines the configurations commonly used by the Thrift Client. */
public class ThriftClientProperty {
private final TProtocolFactory protocolFactory;
private final int connectionTimeoutMs;
private final int selectorNumOfAsyncClientPool;
+ private final boolean printLogWhenEncounterException;
- public ThriftClientProperty(
- TProtocolFactory protocolFactory, int connectionTimeoutMs, int selectorNumOfAsyncClientPool) {
+ private ThriftClientProperty(
+ TProtocolFactory protocolFactory,
+ int connectionTimeoutMs,
+ int selectorNumOfAsyncClientPool,
+ boolean printLogWhenEncounterException) {
this.protocolFactory = protocolFactory;
this.connectionTimeoutMs = connectionTimeoutMs;
this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
+ this.printLogWhenEncounterException = printLogWhenEncounterException;
}
public TProtocolFactory getProtocolFactory() {
@@ -50,6 +56,10 @@ public class ThriftClientProperty {
return selectorNumOfAsyncClientPool;
}
+ public boolean isPrintLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
public static class Builder {
/** whether to use thrift compression. */
@@ -60,6 +70,13 @@ public class ThriftClientProperty {
private int selectorNumOfAsyncClientManager =
DefaultProperty.SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER;
+ /**
+ * Whether to print logs when the client encounters exceptions. For example, logs are not
+ * printed in the heartbeat client.
+ */
+ private boolean printLogWhenEncounterException =
+ DefaultProperty.PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION;
+
public Builder setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
return this;
@@ -75,22 +92,29 @@ public class ThriftClientProperty {
return this;
}
+ public Builder setPrintLogWhenEncounterException(boolean printLogWhenEncounterException) {
+ this.printLogWhenEncounterException = printLogWhenEncounterException;
+ return this;
+ }
+
public ThriftClientProperty build() {
return new ThriftClientProperty(
rpcThriftCompressionEnabled
? new TCompactProtocol.Factory()
: new TBinaryProtocol.Factory(),
connectionTimeoutMs,
- selectorNumOfAsyncClientManager);
+ selectorNumOfAsyncClientManager,
+ printLogWhenEncounterException);
}
}
- public static class DefaultProperty {
+ private static class DefaultProperty {
private DefaultProperty() {}
public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);
public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
+ public static final boolean PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION = true;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 6f6024ccd2..4e1df10520 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -40,23 +39,26 @@ import java.net.SocketException;
public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
implements ThriftClient, AutoCloseable {
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
public SyncConfigNodeIServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endPoint,
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
throws TTransportException {
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- endPoint.getIp(),
- endPoint.getPort(),
- connectionTimeout))));
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endPoint.getIp(),
+ endPoint.getPort(),
+ property.getConnectionTimeoutMs()))));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
@@ -86,6 +88,11 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
@Override
public String toString() {
return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
@@ -112,9 +119,8 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncConfigNodeIServiceClient.class,
SyncConfigNodeIServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty,
endpoint,
clientManager));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index cab879e08a..2b90b5b266 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -41,23 +40,26 @@ import java.net.SocketException;
public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Client
implements ThriftClient, AutoCloseable {
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
public SyncDataNodeInternalServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
throws TTransportException {
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
- connectionTimeout))));
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endpoint.getIp(),
+ endpoint.getPort(),
+ property.getConnectionTimeoutMs()))));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
@@ -97,6 +99,11 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
@Override
public String toString() {
return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
@@ -124,9 +131,8 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncDataNodeInternalServiceClient.class,
SyncDataNodeInternalServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty,
endpoint,
clientManager));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
index dd3474e865..024b7515fc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -40,23 +39,26 @@ import java.net.SocketException;
public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.Client
implements ThriftClient, AutoCloseable {
+ private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager;
public SyncDataNodeMPPDataExchangeServiceClient(
- TProtocolFactory protocolFactory,
- int connectionTimeout,
+ ThriftClientProperty property,
TEndPoint endpoint,
ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager)
throws TTransportException {
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
- connectionTimeout))));
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endpoint.getIp(),
+ endpoint.getPort(),
+ property.getConnectionTimeoutMs()))));
+ this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
@@ -86,6 +88,11 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
clientManager.clear(endpoint);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
@Override
public String toString() {
return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endpoint);
@@ -113,9 +120,8 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncDataNodeMPPDataExchangeServiceClient.class,
SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty,
endpoint,
clientManager));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 6dc2f13c4a..7ebe15a043 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -126,7 +126,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -147,7 +146,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
private static final int RETRY_INTERVAL_MS = 1000;
- private final long connectionTimeout;
+ private final ThriftClientProperty property;
private IConfigNodeRPCService.Iface client;
@@ -167,17 +166,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
ConfigRegionId configRegionId = ConfigNodeInfo.CONFIG_REGION_ID;
- TProtocolFactory protocolFactory;
-
public ConfigNodeClient(
List<TEndPoint> configNodes,
- TProtocolFactory protocolFactory,
- long connectionTimeout,
+ ThriftClientProperty property,
ClientManager<ConfigRegionId, ConfigNodeClient> clientManager)
throws TException {
this.configNodes = configNodes;
- this.protocolFactory = protocolFactory;
- this.connectionTimeout = connectionTimeout;
+ this.property = property;
this.clientManager = clientManager;
init();
@@ -198,7 +193,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
transport =
RpcTransportFactory.INSTANCE.getTransport(
// As there is a try-catch already, we do not need to use TSocket.wrap
- endpoint.getIp(), endpoint.getPort(), (int) connectionTimeout);
+ endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs());
if (!transport.isOpen()) {
transport.open();
}
@@ -207,7 +202,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(e);
}
- client = new IConfigNodeRPCService.Client(protocolFactory.getProtocol(transport));
+ client = new IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
}
private void waitAndReconnect() throws TException {
@@ -283,6 +278,11 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
clientManager.clear(ConfigNodeInfo.CONFIG_REGION_ID);
}
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return property.isPrintLogWhenEncounterException();
+ }
+
private boolean updateConfigNodeLeader(TSStatus status) {
if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
if (status.isSetRedirectNode()) {
@@ -1974,10 +1974,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
SyncThriftClientWithErrorHandler.newErrorHandler(
ConfigNodeClient.class,
ConfigNodeClient.class.getConstructor(
- List.class, TProtocolFactory.class, long.class, clientManager.getClass()),
+ List.class, thriftClientProperty.getClass(), clientManager.getClass()),
ConfigNodeInfo.getInstance().getLatestConfigNodes(),
- thriftClientProperty.getProtocolFactory(),
- thriftClientProperty.getConnectionTimeoutMs(),
+ thriftClientProperty,
clientManager));
}