You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2019/10/25 15:26:53 UTC
[airavata] branch staging updated: AIRAVATA-3186 Enable abandoned
object removal in ThriftClientPool
This is an automated email from the ASF dual-hosted git repository.
machristie pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new c6a6ae9 AIRAVATA-3186 Enable abandoned object removal in ThriftClientPool
c6a6ae9 is described below
commit c6a6ae93201f563c2e677697dbdbdaa9f6a0261a
Author: Marcus Christie <ma...@iu.edu>
AuthorDate: Fri Oct 25 11:25:33 2019 -0400
AIRAVATA-3186 Enable abandoned object removal in ThriftClientPool
---
.../api/server/handler/AiravataServerHandler.java | 41 +++---
.../scigap/develop/group_vars/all/vars.yml | 2 +-
.../scigap/staging/group_vars/all/vars.yml | 2 +
dev-tools/ansible/roles/api-orch/defaults/main.yml | 2 +
.../templates/airavata-server.properties.j2 | 5 +
.../ansible/roles/helix_setup/defaults/main.yml | 2 +
.../parser-wm/airavata-server.properties.j2 | 7 +-
.../participant/airavata-server.properties.j2 | 7 +-
.../post-wm/airavata-server.properties.j2 | 7 +-
.../templates/pre-wm/airavata-server.properties.j2 | 7 +-
.../ansible/roles/job_monitor/defaults/main.yml | 2 +
.../email-monitor/airavata-server.properties.j2 | 7 +-
.../helix/impl/workflow/WorkflowManager.java | 20 +--
modules/commons/pom.xml | 12 ++
.../airavata/common/utils/ApplicationSettings.java | 7 +
.../airavata/common/utils/ThriftClientPool.java | 70 +++++++---
.../common/utils/ThriftClientPoolTest.java | 141 +++++++++++++++++++++
.../src/main/resources/airavata-server.properties | 5 +
.../src/main/resources/airavata-server.properties | 7 +-
.../apache/airavata/monitor/AbstractMonitor.java | 20 +--
.../messaging/RegistryServiceDBEventHandler.java | 21 +--
21 files changed, 324 insertions(+), 70 deletions(-)
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index ca9b95e..f945d51 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -88,7 +88,7 @@ import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.airavata.service.security.interceptor.SecurityCheck;
import org.apache.airavata.sharing.registry.models.*;
import org.apache.airavata.sharing.registry.service.cpi.SharingRegistryService;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,23 +111,19 @@ public class AiravataServerHandler implements Airavata.Iface {
statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
experimentPublisher = MessagingFactory.getPublisher(Type.EXPERIMENT_LAUNCH);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = 100;
- poolConfig.minIdle = 5;
- poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
- poolConfig.testOnBorrow = true;
- poolConfig.testWhileIdle = true;
- poolConfig.numTestsPerEvictionRun = 10;
- poolConfig.maxWait = 3000;
-
- sharingClientPool = new ThriftClientPool<>(
- tProtocol -> new SharingRegistryService.Client(tProtocol), poolConfig, ServerSettings.getSharingRegistryHost(),
- Integer.parseInt(ServerSettings.getSharingRegistryPort()));
+ sharingClientPool = new ThriftClientPool<>(
+ tProtocol -> new SharingRegistryService.Client(tProtocol),
+ this.<SharingRegistryService.Client>createGenericObjectPoolConfig(),
+ ServerSettings.getSharingRegistryHost(), Integer.parseInt(ServerSettings.getSharingRegistryPort()));
registryClientPool = new ThriftClientPool<>(
- tProtocol -> new RegistryService.Client(tProtocol), poolConfig, ServerSettings.getRegistryServerHost(),
+ tProtocol -> new RegistryService.Client(tProtocol),
+ this.<RegistryService.Client>createGenericObjectPoolConfig(),
+ ServerSettings.getRegistryServerHost(),
Integer.parseInt(ServerSettings.getRegistryServerPort()));
csClientPool = new ThriftClientPool<>(
- tProtocol -> new CredentialStoreService.Client(tProtocol), poolConfig, ServerSettings.getCredentialStoreServerHost(),
+ tProtocol -> new CredentialStoreService.Client(tProtocol),
+ this.<CredentialStoreService.Client>createGenericObjectPoolConfig(),
+ ServerSettings.getCredentialStoreServerHost(),
Integer.parseInt(ServerSettings.getCredentialStoreServerPort()));
initSharingRegistry();
@@ -141,6 +137,21 @@ public class AiravataServerHandler implements Airavata.Iface {
}
}
+ private <T> GenericObjectPoolConfig<T> createGenericObjectPoolConfig() {
+
+ GenericObjectPoolConfig<T> poolConfig = new GenericObjectPoolConfig<T>();
+ poolConfig.setMaxTotal(100);
+ poolConfig.setMinIdle(5);
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setTestOnBorrow(true);
+ poolConfig.setTestWhileIdle(true);
+ // must set timeBetweenEvictionRunsMillis since eviction doesn't run unless that is positive
+ poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+ poolConfig.setNumTestsPerEvictionRun(10);
+ poolConfig.setMaxWaitMillis(3000);
+ return poolConfig;
+ }
+
/**
* This method creates a password token for the default gateway profile. Default gateway is originally initialized
* at the registry server but we can not add the password token at that step as the credential store is not initialized
diff --git a/dev-tools/ansible/inventories/scigap/develop/group_vars/all/vars.yml b/dev-tools/ansible/inventories/scigap/develop/group_vars/all/vars.yml
index 514e355..fd07982 100644
--- a/dev-tools/ansible/inventories/scigap/develop/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/develop/group_vars/all/vars.yml
@@ -174,4 +174,4 @@ parser_broker_consumer_group: "ParsingConsumer"
parser_storage_resource_id: "pgadev.scigap.org_7ddf28fd-d503-4ff8-bbc5-3279a7c3b99e"
parser_broker_publisher_id: "ParserProducer"
-
+thrift_client_pool_abandoned_removal_enabled: true
diff --git a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
index 325c9e4..5af7d01 100644
--- a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
@@ -182,3 +182,5 @@ parser_broker_consumer_group: "ParsingConsumer"
# TODO: update storage resource id
parser_storage_resource_id: "pga.staging.scigap.org_aa63ffa0-a99f-4885-8f4b-81e3c4c4d737"
parser_broker_publisher_id: "ParserProducer"
+
+thrift_client_pool_abandoned_removal_enabled: true
diff --git a/dev-tools/ansible/roles/api-orch/defaults/main.yml b/dev-tools/ansible/roles/api-orch/defaults/main.yml
index 5855377..582a43b 100644
--- a/dev-tools/ansible/roles/api-orch/defaults/main.yml
+++ b/dev-tools/ansible/roles/api-orch/defaults/main.yml
@@ -44,3 +44,5 @@ default_registry_user : "admin"
default_registry_password : "admin"
api_orch_systemd_unit_file: "/etc/systemd/system/apiorch.service"
+
+thrift_client_pool_abandoned_removal_enabled: false
diff --git a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2 b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
index 7839b90..44e717e 100644
--- a/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/api-orch/templates/airavata-server.properties.j2
@@ -323,3 +323,8 @@ iam.server.super.admin.password={{ iam_server_super_admin_password }}
# DB Event Manager Runner
###########################################################################
db_event_manager=org.apache.airavata.db.event.manager.DBEventManagerRunner
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/dev-tools/ansible/roles/helix_setup/defaults/main.yml b/dev-tools/ansible/roles/helix_setup/defaults/main.yml
index 0a78fa4..a6385b9 100644
--- a/dev-tools/ansible/roles/helix_setup/defaults/main.yml
+++ b/dev-tools/ansible/roles/helix_setup/defaults/main.yml
@@ -52,4 +52,6 @@ kafka_listener_port: 9092
kafka_rest_proxy_listener_port: 8082
local_data_location: "/tmp"
+
+thrift_client_pool_abandoned_removal_enabled: false
...
diff --git a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
index 9fad984..adb8a28 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/parser-wm/airavata-server.properties.j2
@@ -58,4 +58,9 @@ kafka.parser.broker.consumer.group={{ parser_broker_consumer_group }}
kafka.parser.topic={{ parser_broker_topic }}
parser.storage.resource.id={{ parser_storage_resource_id }}
kafka.parsing.broker.publisher.id={{ parser_broker_publisher_id }}
-post.workflow.manager.loadbalance.clusters=False
\ No newline at end of file
+post.workflow.manager.loadbalance.clusters=False
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index cbed16d..dae242c 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -93,4 +93,9 @@ data.parser.delete.container=True
###########################################################################
# Data Staging Task Level Configurations
###########################################################################
-enable.streaming.transfer=True
\ No newline at end of file
+enable.streaming.transfer=True
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
index 259e52c..7616f6b 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
@@ -52,4 +52,9 @@ experiment.launch..queue.name=experiment.launch.queue
# Zookeeper Server Configuration
###########################################################################
zookeeper.server.connection={{ zookeeper_connection_url }}
-zookeeper.timeout=30000
\ No newline at end of file
+zookeeper.timeout=30000
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
index cabede9..5cdcf7f 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
@@ -49,4 +49,9 @@ experiment.launch..queue.name=experiment.launch.queue
# Zookeeper Server Configuration
###########################################################################
zookeeper.server.connection={{ zookeeper_connection_url }}
-zookeeper.timeout=30000
\ No newline at end of file
+zookeeper.timeout=30000
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/dev-tools/ansible/roles/job_monitor/defaults/main.yml b/dev-tools/ansible/roles/job_monitor/defaults/main.yml
index 45a6a22..c2c655f 100644
--- a/dev-tools/ansible/roles/job_monitor/defaults/main.yml
+++ b/dev-tools/ansible/roles/job_monitor/defaults/main.yml
@@ -45,4 +45,6 @@ job_monitor_broker_consumer_group: "MonitoringConsumer"
helix_log_max_history: 30
helix_log_total_size_cap: "1GB"
+
+thrift_client_pool_abandoned_removal_enabled: false
...
diff --git a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2 b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
index 1e2c85d..2e50351 100644
--- a/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/job_monitor/templates/email-monitor/airavata-server.properties.j2
@@ -39,4 +39,9 @@ email.expiration.minutes=60
# Registry Server Configurations
###########################################################################
regserver.server.host={{ registry_host }}
-regserver.server.port={{ registry_port }}
\ No newline at end of file
+regserver.server.port={{ registry_port }}
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled={{ thrift_client_pool_abandoned_removal_enabled }}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index ac21d2e..48eb3aa 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -17,7 +17,7 @@ import org.apache.airavata.model.process.ProcessWorkflow;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.api.RegistryService;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -98,14 +98,16 @@ public class WorkflowManager {
}
private void initRegistryClientPool() throws ApplicationSettingsException {
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = 100;
- poolConfig.minIdle = 5;
- poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
- poolConfig.testOnBorrow = true;
- poolConfig.testWhileIdle = true;
- poolConfig.numTestsPerEvictionRun = 10;
- poolConfig.maxWait = 3000;
+ GenericObjectPoolConfig<RegistryService.Client> poolConfig = new GenericObjectPoolConfig<>();
+ poolConfig.setMaxTotal(100);
+ poolConfig.setMinIdle(5);
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setTestOnBorrow(true);
+ poolConfig.setTestWhileIdle(true);
+ // must set timeBetweenEvictionRunsMillis since eviction doesn't run unless that is positive
+ poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+ poolConfig.setNumTestsPerEvictionRun(10);
+ poolConfig.setMaxWaitMillis(3000);
this.registryClientPool = new ThriftClientPool<>(
RegistryService.Client::new, poolConfig, ServerSettings.getRegistryServerHost(),
diff --git a/modules/commons/pom.xml b/modules/commons/pom.xml
index 68fedfc..76a3ce9 100644
--- a/modules/commons/pom.xml
+++ b/modules/commons/pom.xml
@@ -57,6 +57,11 @@
<version>1.4</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -100,6 +105,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <version>${junit.version}</version>
</dependency>
<dependency>
<groupId>net.sf.dozer</groupId>
@@ -129,6 +135,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.jmockit</groupId>
+ <artifactId>jmockit</artifactId>
+ <version>1.39</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
index 036d6bf..f0a75c3 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
@@ -52,6 +52,9 @@ public class ApplicationSettings {
private final static Logger logger = LoggerFactory.getLogger(ApplicationSettings.class);
private static final String SHUTDOWN_STATEGY_STRING="shutdown.strategy";
+
+ // ThriftClientPool Constants
+ private static final String THRIFT_CLIENT_POOL_ABANDONED_REMOVAL_ENABLED = "thrift.client.pool.abandoned.removal.enabled";
protected static ApplicationSettings INSTANCE;
public static enum ShutdownStrategy{
@@ -435,6 +438,10 @@ public class ApplicationSettings {
return getSetting(ServerSettings.IAM_SERVER_URL);
}
+ public static boolean isThriftClientPoolAbandonedRemovalEnabled() {
+ return Boolean.valueOf(getSetting(THRIFT_CLIENT_POOL_ABANDONED_REMOVAL_ENABLED, "false"));
+ }
+
/**
* @deprecated use {{@link #getSetting(String)}}
* @return
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ThriftClientPool.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ThriftClientPool.java
index 43dd926..6a0bc2a 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ThriftClientPool.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ThriftClientPool.java
@@ -20,8 +20,12 @@
package org.apache.airavata.common.utils;
import org.apache.airavata.base.api.BaseAPI;
-import org.apache.commons.pool.BasePoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.AbandonedConfig;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
@@ -34,44 +38,65 @@ public class ThriftClientPool<T extends BaseAPI.Client> implements AutoCloseable
private static final Logger logger = LoggerFactory.getLogger(ThriftClientPool.class);
- private final GenericObjectPool internalPool;
+ private final GenericObjectPool<T> internalPool;
- public ThriftClientPool(ClientFactory<T> clientFactory,
- GenericObjectPool.Config poolConfig, String host, int port) {
- this(clientFactory, new BinaryOverSocketProtocolFactory(host, port),
- poolConfig);
+ public ThriftClientPool(ClientFactory<T> clientFactory, GenericObjectPoolConfig<T> poolConfig, String host,
+ int port) {
+ this(clientFactory, new BinaryOverSocketProtocolFactory(host, port), poolConfig);
}
- public ThriftClientPool(ClientFactory<T> clientFactory,
- ProtocolFactory protocolFactory, GenericObjectPool.Config poolConfig) {
- this.internalPool = new GenericObjectPool(new ThriftClientFactory(
- clientFactory, protocolFactory), poolConfig);
+ public ThriftClientPool(ClientFactory<T> clientFactory, ProtocolFactory protocolFactory,
+ GenericObjectPoolConfig<T> poolConfig) {
+
+ AbandonedConfig abandonedConfig = null;
+ if (ApplicationSettings.isThriftClientPoolAbandonedRemovalEnabled()) {
+ abandonedConfig = new AbandonedConfig();
+ abandonedConfig.setLogAbandoned(true);
+ abandonedConfig.setRemoveAbandonedOnBorrow(true);
+ abandonedConfig.setRemoveAbandonedOnMaintenance(true);
+ }
+ this.internalPool = new GenericObjectPool<T>(new ThriftClientFactory(clientFactory, protocolFactory),
+ poolConfig, abandonedConfig);
+ }
+
+ public ThriftClientPool(ClientFactory<T> clientFactory, ProtocolFactory protocolFactory,
+ GenericObjectPoolConfig<T> poolConfig, AbandonedConfig abandonedConfig) {
+
+ if (abandonedConfig != null && abandonedConfig.getRemoveAbandonedOnMaintenance()
+ && poolConfig.getTimeBetweenEvictionRunsMillis() <= 0) {
+ logger.warn("Abandoned removal is enabled but"
+ + " removeAbandonedOnMaintenance won't run since"
+ + " timeBetweenEvictionRunsMillis is not positive, current value: {}",
+ poolConfig.getTimeBetweenEvictionRunsMillis());
+ }
+ this.internalPool = new GenericObjectPool<T>(new ThriftClientFactory(clientFactory, protocolFactory),
+ poolConfig, abandonedConfig);
}
- class ThriftClientFactory extends BasePoolableObjectFactory {
+ class ThriftClientFactory extends BasePooledObjectFactory<T> {
private ClientFactory<T> clientFactory;
private ProtocolFactory protocolFactory;
- public ThriftClientFactory(ClientFactory<T> clientFactory,
- ProtocolFactory protocolFactory) {
+ public ThriftClientFactory(ClientFactory<T> clientFactory, ProtocolFactory protocolFactory) {
this.clientFactory = clientFactory;
this.protocolFactory = protocolFactory;
}
@Override
- public T makeObject() throws Exception {
+ public T create() throws Exception {
try {
TProtocol protocol = protocolFactory.make();
return clientFactory.make(protocol);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
- throw new ThriftClientException(
- "Can not make a new object for pool", e);
+ throw new ThriftClientException("Can not make a new object for pool", e);
}
}
- public void destroyObject(T obj) throws Exception {
+ @Override
+ public void destroyObject(PooledObject<T> pooledObject) throws Exception {
+ T obj = pooledObject.getObject();
if (obj.getOutputProtocol().getTransport().isOpen()) {
obj.getOutputProtocol().getTransport().close();
}
@@ -79,6 +104,11 @@ public class ThriftClientPool<T extends BaseAPI.Client> implements AutoCloseable
obj.getInputProtocol().getTransport().close();
}
}
+
+ @Override
+ public PooledObject<T> wrap(T obj) {
+ return new DefaultPooledObject<T>(obj);
+ }
}
public static interface ClientFactory<T> {
@@ -129,7 +159,7 @@ public class ThriftClientPool<T extends BaseAPI.Client> implements AutoCloseable
try {
for( int i = 0; i < 10 ; i++) {
// This tries to fetch a client from the pool and validate it before returning.
- final T client = (T) internalPool.borrowObject();
+ final T client = internalPool.borrowObject();
try {
String apiVersion = client.getAPIVersion();
logger.debug("Validated client and fetched api version " + apiVersion);
@@ -183,4 +213,4 @@ public class ThriftClientPool<T extends BaseAPI.Client> implements AutoCloseable
throw new ThriftClientException("Could not destroy the pool", e);
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/commons/src/test/java/org/apache/airavata/common/utils/ThriftClientPoolTest.java b/modules/commons/src/test/java/org/apache/airavata/common/utils/ThriftClientPoolTest.java
new file mode 100644
index 0000000..8bdefe9
--- /dev/null
+++ b/modules/commons/src/test/java/org/apache/airavata/common/utils/ThriftClientPoolTest.java
@@ -0,0 +1,141 @@
+package org.apache.airavata.common.utils;
+
+import org.apache.airavata.base.api.BaseAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.commons.pool2.impl.AbandonedConfig;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.Verifications;
+import mockit.integration.junit4.JMockit;
+
+@RunWith(JMockit.class)
+public class ThriftClientPoolTest {
+
+ @Mocked
+ private BaseAPI.Client mockClient;
+
+ @Test
+ public void testWithDefaultConfig() throws TException {
+ new Expectations() {
+ {
+ mockClient.getAPIVersion();
+ result = "0.19";
+ mockClient.getInputProtocol().getTransport().isOpen();
+ result = true;
+ mockClient.getOutputProtocol().getTransport().isOpen();
+ result = true;
+ }
+ };
+
+ GenericObjectPoolConfig<BaseAPI.Client> poolConfig = new GenericObjectPoolConfig<>();
+ ThriftClientPool<BaseAPI.Client> thriftClientPool = new ThriftClientPool<>((protocol) -> mockClient, () -> null,
+ poolConfig);
+ BaseAPI.Client client = thriftClientPool.getResource();
+ thriftClientPool.returnResource(client);
+ thriftClientPool.close();
+
+ new Verifications() {
+ {
+ mockClient.getInputProtocol().getTransport().close();
+ mockClient.getOutputProtocol().getTransport().close();
+ }
+ };
+ }
+
+ @Test
+ public void testWithAbandonConfigAndAbandoned() throws TException {
+
+ new Expectations() {
+ {
+ mockClient.getAPIVersion();
+ result = "0.19";
+ mockClient.getInputProtocol().getTransport().isOpen();
+ result = true;
+ mockClient.getOutputProtocol().getTransport().isOpen();
+ result = true;
+ }
+ };
+
+ GenericObjectPoolConfig<BaseAPI.Client> poolConfig = new GenericObjectPoolConfig<>();
+ // timeBetweenEvictionRunsMillis must be positive for abandoned removal on
+ // maintenance to run
+ poolConfig.setTimeBetweenEvictionRunsMillis(1);
+ AbandonedConfig abandonedConfig = new AbandonedConfig();
+ abandonedConfig.setRemoveAbandonedTimeout(1);
+ abandonedConfig.setRemoveAbandonedOnMaintenance(true);
+ abandonedConfig.setLogAbandoned(true);
+ ThriftClientPool<BaseAPI.Client> thriftClientPool = new ThriftClientPool<>((protocol) -> mockClient, () -> null,
+ poolConfig, abandonedConfig);
+ thriftClientPool.getResource();
+ try {
+ // Sleep long enough for the client to be considered abandoned
+ Thread.sleep(1001);
+ thriftClientPool.close();
+ } catch (InterruptedException e) {
+ Assert.fail("sleep interrupted");
+ }
+
+ new Verifications() {
+ {
+ // Verify client is destroyed when abandoned
+ mockClient.getInputProtocol().getTransport().close();
+ times = 1;
+ mockClient.getOutputProtocol().getTransport().close();
+ times = 1;
+ }
+ };
+ }
+
+ /**
+ * Just like #{@link #testWithAbandonConfigAndAbandoned()} but using default
+ * configuration.
+ *
+ * @throws TException
+ * @throws ApplicationSettingsException
+ */
+ @Test
+ @Ignore("Test requires long wait time to account for default removeAbandonedTimeout")
+ public void testWithDefaultAbandonedRemovalEnabled() throws TException, ApplicationSettingsException {
+
+ new Expectations() {
+ {
+ mockClient.getAPIVersion();
+ result = "0.19";
+ mockClient.getInputProtocol().getTransport().isOpen();
+ result = true;
+ mockClient.getOutputProtocol().getTransport().isOpen();
+ result = true;
+ }
+ };
+
+ GenericObjectPoolConfig<BaseAPI.Client> poolConfig = new GenericObjectPoolConfig<>();
+ // timeBetweenEvictionRunsMillis must be positive for abandoned removal on
+ // maintenance to run
+ poolConfig.setTimeBetweenEvictionRunsMillis(1);
+ ServerSettings.setSetting("thrift.client.pool.abandoned.removal.enabled", "true");
+ ThriftClientPool<BaseAPI.Client> thriftClientPool = new ThriftClientPool<>((protocol) -> mockClient, () -> null,
+ poolConfig);
+ thriftClientPool.getResource();
+ try {
+ // Sleep long enough for the client to be considered abandoned
+ // Default removeAbandonedTimeout is 300 seconds
+ Thread.sleep(new AbandonedConfig().getRemoveAbandonedTimeout() * 1000 + 1);
+ thriftClientPool.close();
+ } catch (InterruptedException e) {
+ Assert.fail("sleep interrupted");
+ }
+
+ new Verifications() {{
+ // Verify client is destroyed when abandoned
+ mockClient.getInputProtocol().getTransport().close(); times = 1;
+ mockClient.getOutputProtocol().getTransport().close(); times = 1;
+ }};
+ }
+}
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index e431f13..ec18c3f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -357,3 +357,8 @@ iam.server.super.admin.password=password
# DB Event Manager Runner
###########################################################################
db_event_manager=org.apache.airavata.db.event.manager.DBEventManagerRunner
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled=false
diff --git a/modules/ide-integration/src/main/resources/airavata-server.properties b/modules/ide-integration/src/main/resources/airavata-server.properties
index 735d359..f3745ec 100644
--- a/modules/ide-integration/src/main/resources/airavata-server.properties
+++ b/modules/ide-integration/src/main/resources/airavata-server.properties
@@ -276,4 +276,9 @@ email.expiration.minutes=60
email.based.monitoring.period=10000
email.based.monitor.address=CHANGEME
-email.based.monitor.password=CHANGEME
\ No newline at end of file
+email.based.monitor.password=CHANGEME
+
+###########################################################################
+# ThriftClientPool Configuration
+###########################################################################
+thrift.client.pool.abandoned.removal.enabled=true
diff --git a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
index 92efa00..69bf883 100644
--- a/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
+++ b/modules/job-monitor/job-monitor-api/src/main/java/org/apache/airavata/monitor/AbstractMonitor.java
@@ -6,7 +6,7 @@ import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.monitor.kafka.MessageProducer;
import org.apache.airavata.registry.api.RegistryService;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -35,14 +35,16 @@ public class AbstractMonitor {
private void initRegistryClientPool() throws ApplicationSettingsException {
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = 100;
- poolConfig.minIdle = 5;
- poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
- poolConfig.testOnBorrow = true;
- poolConfig.testWhileIdle = true;
- poolConfig.numTestsPerEvictionRun = 10;
- poolConfig.maxWait = 3000;
+ GenericObjectPoolConfig<RegistryService.Client> poolConfig = new GenericObjectPoolConfig<>();
+ poolConfig.setMaxTotal(100);
+ poolConfig.setMinIdle(5);
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setTestOnBorrow(true);
+ poolConfig.setTestWhileIdle(true);
+ // must set timeBetweenEvictionRunsMillis since eviction doesn't run unless that is positive
+ poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+ poolConfig.setNumTestsPerEvictionRun(10);
+ poolConfig.setMaxWaitMillis(3000);
this.registryClientPool = new ThriftClientPool<>(
RegistryService.Client::new, poolConfig, ServerSettings.getRegistryServerHost(),
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
index 3a2b662..2eb29da 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
@@ -21,7 +21,6 @@ package org.apache.airavata.registry.api.service.messaging;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.DBEventManagerConstants;
import org.apache.airavata.common.utils.DBEventService;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
@@ -39,7 +38,7 @@ import org.apache.airavata.model.workspace.Gateway;
import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,14 +56,16 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
public RegistryServiceDBEventHandler() throws ApplicationSettingsException, RegistryServiceException {
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = 5;
- poolConfig.minIdle = 1;
- poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
- poolConfig.testOnBorrow = true;
- poolConfig.testWhileIdle = true;
- poolConfig.numTestsPerEvictionRun = 10;
- poolConfig.maxWait = 3000;
+ GenericObjectPoolConfig<RegistryService.Client> poolConfig = new GenericObjectPoolConfig<>();
+ poolConfig.setMaxTotal(5);
+ poolConfig.setMinIdle(1);
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setTestOnBorrow(true);
+ poolConfig.setTestWhileIdle(true);
+ // must set timeBetweenEvictionRunsMillis since eviction doesn't run unless that is positive
+ poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
+ poolConfig.setNumTestsPerEvictionRun(10);
+ poolConfig.setMaxWaitMillis(3000);
registryClientPool = new ThriftClientPool<>(
tProtocol -> new RegistryService.Client(tProtocol), poolConfig, ServerSettings.getRegistryServerHost(),