You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/03 18:38:44 UTC
[2/4] airavata git commit: Make AuroraThriftClient a synchronized
singleton & Reconnect with Aurora scheduler on leader rotation
Make AuroraThriftClient a synchronized singleton & Reconnect with Aurora scheduler on leader rotation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/946143a7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/946143a7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/946143a7
Branch: refs/heads/develop
Commit: 946143a74c6737ed4adcdbb545148aff7dcab241
Parents: 593331b
Author: Gourav Shenoy <sh...@gmail.com>
Authored: Thu Nov 3 14:12:08 2016 -0400
Committer: Gourav Shenoy <sh...@gmail.com>
Committed: Thu Nov 3 14:12:08 2016 -0400
----------------------------------------------------------------------
.../cloud/aurora/client/AuroraThriftClient.java | 233 ++++++++++++++-----
.../aurora/util/AuroraThriftClientUtil.java | 2 +-
2 files changed, 176 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
index 977479b..e955dc5 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
@@ -39,6 +39,7 @@ import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
import org.apache.airavata.cloud.aurora.util.Constants;
import org.apache.airavata.cloud.aurora.util.ResponseResultType;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,31 +74,101 @@ public class AuroraThriftClient {
public static AuroraThriftClient getAuroraThriftClient() throws Exception {
try {
if(thriftClient == null) {
- thriftClient = new AuroraThriftClient();
-
- // construct connection url for scheduler
- String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
- Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
-
- // check reachable scheduler host
+ synchronized(AuroraThriftClient.class) {
+ if(thriftClient == null) {
+ thriftClient = new AuroraThriftClient();
+
+ // construct connection url for scheduler
+ String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
+ Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
+
+ // check reachable scheduler host
+ if(auroraHosts != null) {
+ for(String auroraHost : auroraHosts.split(",")) {
+ // malformed host string, should be of form <host:port>
+ if(auroraHost.split(":").length != 2) {
+ throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!");
+ }
+
+ // read hostname, port & construct connection-url
+ String hostname = auroraHost.split(":")[0];
+ String port = auroraHost.split(":")[1];
+ String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port);
+
+ // verify if connection succeeds
+ if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) {
+ thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout);
+ thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout);
+ break;
+ }
+ }
+
+ // check if scheduler connection successful
+ if(thriftClient.auroraSchedulerManagerClient == null ||
+ thriftClient.readOnlySchedulerClient == null) {
+ throw new Exception("None of the Aurora scheduler hosts were reachable, hence connection not established!");
+ }
+ } else {
+ // aurora hosts not defined in the properties file
+ throw new Exception("Aurora hosts not specified in airavata-server.properties file.");
+ }
+ }
+ }
+ }
+ } catch(Exception ex) {
+ logger.error(ex.getMessage(), ex);
+ throw ex;
+ }
+ return thriftClient;
+ }
+
+
+ /**
+ * Reconnect with aurora scheduler.
+ *
+ * @return true, if successful
+ */
+ private boolean reconnectWithAuroraScheduler() {
+ boolean connectionSuccess = false;
+
+ try {
+ // construct connection url for scheduler
+ String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
+ Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
+
+ // check reachable scheduler host
+ if(auroraHosts != null) {
for(String auroraHost : auroraHosts.split(",")) {
+ // malformed host string, should be of form <host:port>
+ if(auroraHost.split(":").length != 2) {
+ throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!");
+ }
+
+ // read hostname, port & construct connection-url
String hostname = auroraHost.split(":")[0];
String port = auroraHost.split(":")[1];
String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port);
+ // verify if connection succeeds
if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) {
thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout);
thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout);
+
+ // set connection-success flag
+ connectionSuccess = true;
}
}
+ } else {
+ // aurora hosts not defined in the properties file
+ throw new Exception("Aurora hosts not specified in airavata-server.properties file.");
}
} catch(Exception ex) {
logger.error(ex.getMessage(), ex);
- throw ex;
}
- return thriftClient;
+ return connectionSuccess;
}
+
/**
* Creates the job.
*
@@ -107,16 +178,26 @@ public class AuroraThriftClient {
*/
public ResponseBean createJob(JobConfigBean jobConfigBean) throws Exception {
ResponseBean response = null;
- try {
- if(jobConfigBean != null) {
- JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
- Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig);
- response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB);
+ // try till we get response or scheduler connection not found
+ while(response == null) {
+ try {
+ if(jobConfigBean != null) {
+ JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
+ Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig);
+ response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB);
+ }
+ } catch(Exception ex) {
+ if (ex instanceof TTransportException) {
+ // if re-connection success, retry command
+ if (this.reconnectWithAuroraScheduler()) {
+ continue;
+ }
+ }
+ logger.error(ex.getMessage(), ex);
+ throw ex;
}
- } catch(Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw ex;
}
+
return response;
}
@@ -130,15 +211,24 @@ public class AuroraThriftClient {
*/
public ResponseBean killTasks(JobKeyBean jobKeyBean, Set<Integer> instances) throws Exception {
ResponseBean response = null;
- try {
- if(jobKeyBean != null) {
- JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
- Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
- response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS);
+ // try till we get response or scheduler connection not found
+ while(response == null) {
+ try {
+ if(jobKeyBean != null) {
+ JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+ Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
+ response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS);
+ }
+ } catch(Exception ex) {
+ if (ex instanceof TTransportException) {
+ // if re-connection success, retry command
+ if (this.reconnectWithAuroraScheduler()) {
+ continue;
+ }
+ }
+ logger.error(ex.getMessage(), ex);
+ throw ex;
}
- } catch(Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw ex;
}
return response;
}
@@ -152,12 +242,21 @@ public class AuroraThriftClient {
*/
public GetJobsResponseBean getJobList(String ownerRole) throws Exception {
GetJobsResponseBean response = null;
- try {
- Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole);
- response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS);
- } catch(Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw ex;
+ // try till we get response or scheduler connection not found
+ while(response == null) {
+ try {
+ Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole);
+ response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS);
+ } catch(Exception ex) {
+ if (ex instanceof TTransportException) {
+ // if re-connection success, retry command
+ if (this.reconnectWithAuroraScheduler()) {
+ continue;
+ }
+ }
+ logger.error(ex.getMessage(), ex);
+ throw ex;
+ }
}
return response;
}
@@ -171,19 +270,28 @@ public class AuroraThriftClient {
*/
public PendingJobReasonBean getPendingReasonForJob(JobKeyBean jobKeyBean) throws Exception {
PendingJobReasonBean response = null;
- try {
- JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
- Set<JobKey> jobKeySet = new HashSet<>();
- jobKeySet.add(jobKey);
-
- TaskQuery query = new TaskQuery();
- query.setJobKeys(jobKeySet);
-
- Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query);
- response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON);
- } catch(Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw ex;
+ // try till we get response or scheduler connection not found
+ while(response == null) {
+ try {
+ JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+ Set<JobKey> jobKeySet = new HashSet<>();
+ jobKeySet.add(jobKey);
+
+ TaskQuery query = new TaskQuery();
+ query.setJobKeys(jobKeySet);
+
+ Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query);
+ response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON);
+ } catch(Exception ex) {
+ if (ex instanceof TTransportException) {
+ // if re-connection success, retry command
+ if (this.reconnectWithAuroraScheduler()) {
+ continue;
+ }
+ }
+ logger.error(ex.getMessage(), ex);
+ throw ex;
+ }
}
return response;
}
@@ -197,21 +305,30 @@ public class AuroraThriftClient {
*/
public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
JobDetailsResponseBean response = null;
- try {
- if(jobKeyBean != null) {
- JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
- Set<JobKey> jobKeySet = new HashSet<>();
- jobKeySet.add(jobKey);
-
- TaskQuery query = new TaskQuery();
- query.setJobKeys(jobKeySet);
-
- Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query);
- response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS);
+ // try till we get response or scheduler connection not found
+ while(response == null) {
+ try {
+ if(jobKeyBean != null) {
+ JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+ Set<JobKey> jobKeySet = new HashSet<>();
+ jobKeySet.add(jobKey);
+
+ TaskQuery query = new TaskQuery();
+ query.setJobKeys(jobKeySet);
+
+ Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query);
+ response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS);
+ }
+ } catch(Exception ex) {
+ if (ex instanceof TTransportException) {
+ // if re-connection success, retry command
+ if (this.reconnectWithAuroraScheduler()) {
+ continue;
+ }
+ }
+ logger.error(ex.getMessage(), ex);
+ throw ex;
}
- } catch(Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw ex;
}
return response;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
index 7cb03b4..c13ef8f 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
@@ -390,7 +390,7 @@ public class AuroraThriftClientUtil {
// host is reachable
isReachable = true;
} catch(Exception ex) {
- logger.error("Timed-out connecting to URL: " + connectionUrl, ex);
+ logger.error("Timed-out connecting to URL: " + connectionUrl);
}
return isReachable;
}