You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/03 18:38:44 UTC

[2/4] airavata git commit: Make AuroraThriftClient a synchronized singleton & Reconnect with Aurora scheduler on leader rotation

Make AuroraThriftClient a synchronized singleton & Reconnect with Aurora scheduler on leader rotation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/946143a7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/946143a7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/946143a7

Branch: refs/heads/develop
Commit: 946143a74c6737ed4adcdbb545148aff7dcab241
Parents: 593331b
Author: Gourav Shenoy <sh...@gmail.com>
Authored: Thu Nov 3 14:12:08 2016 -0400
Committer: Gourav Shenoy <sh...@gmail.com>
Committed: Thu Nov 3 14:12:08 2016 -0400

----------------------------------------------------------------------
 .../cloud/aurora/client/AuroraThriftClient.java | 233 ++++++++++++++-----
 .../aurora/util/AuroraThriftClientUtil.java     |   2 +-
 2 files changed, 176 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
index 977479b..e955dc5 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
@@ -39,6 +39,7 @@ import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
 import org.apache.airavata.cloud.aurora.util.Constants;
 import org.apache.airavata.cloud.aurora.util.ResponseResultType;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,31 +74,101 @@ public class AuroraThriftClient {
 	public static AuroraThriftClient getAuroraThriftClient() throws Exception {
 		try {
 			if(thriftClient == null) {
-				thriftClient = new AuroraThriftClient();
-				
-				// construct connection url for scheduler
-				String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
-				Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
-				
-				// check reachable scheduler host
+				synchronized(AuroraThriftClient.class) {
+					if(thriftClient == null) {
+						thriftClient = new AuroraThriftClient();
+						
+						// construct connection url for scheduler
+						String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
+						Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
+						
+						// check reachable scheduler host
+						if(auroraHosts != null) {
+							for(String auroraHost : auroraHosts.split(",")) {
+								// malformed host string, should be of form <host:port>
+								if(auroraHost.split(":").length != 2) {
+									throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!");
+								}
+								
+								// read hostname, port & construct connection-url
+								String hostname = auroraHost.split(":")[0];
+								String port = auroraHost.split(":")[1];
+								String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port);
+								
+								// verify if connection succeeds
+								if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) {
+									thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout);
+									thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout);
+									break;
+								}
+							}
+							
+							// check if scheduler connection successful
+							if(thriftClient.auroraSchedulerManagerClient == null || 
+									thriftClient.readOnlySchedulerClient == null) {
+								throw new Exception("None of the Aurora scheduler hosts were reachable, hence connection not established!");
+							}
+						} else {
+							// aurora hosts not defined in the properties file
+							throw new Exception("Aurora hosts not specified in airavata-server.properties file.");
+						}
+					}
+				}
+			}
+		} catch(Exception ex) {
+			logger.error(ex.getMessage(), ex);
+			throw ex;
+		}
+		return thriftClient;
+	}
+	
+	
+	/**
+	 * Reconnect with aurora scheduler.
+	 *
+	 * @return true, if successful
+	 */
+	private boolean reconnectWithAuroraScheduler() {
+		boolean connectionSuccess = false;
+		
+		try {
+			// construct connection url for scheduler
+			String auroraHosts = ServerSettings.getAuroraSchedulerHosts();
+			Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout();
+			
+			// check reachable scheduler host
+			if(auroraHosts != null) {
 				for(String auroraHost : auroraHosts.split(",")) {
+					// malformed host string, should be of form <host:port>
+					if(auroraHost.split(":").length != 2) {
+						throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!");
+					}
+					
+					// read hostname, port & construct connection-url
 					String hostname = auroraHost.split(":")[0];
 					String port = auroraHost.split(":")[1];
 					String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port);
 					
+					// verify if connection succeeds
 					if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) {
 						thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout);
 						thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout);
+						
+						// set connection-success flag
+						connectionSuccess = true;
 					}
 				}
+			} else {
+				// aurora hosts not defined in the properties file
+				throw new Exception("Aurora hosts not specified in airavata-server.properties file.");
 			}
 		} catch(Exception ex) {
 			logger.error(ex.getMessage(), ex);
-			throw ex;
 		}
-		return thriftClient;
+		return connectionSuccess;
 	}
 	
+	
 	/**
 	 * Creates the job.
 	 *
@@ -107,16 +178,26 @@ public class AuroraThriftClient {
 	 */
 	public ResponseBean createJob(JobConfigBean jobConfigBean) throws Exception {
 		ResponseBean response = null;
-		try {
-			if(jobConfigBean != null) {
-				JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
-				Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig);
-				response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB);
+		// try till we get response or scheduler connection not found
+		while(response == null) {
+			try {
+				if(jobConfigBean != null) {
+					JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
+					Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig);
+					response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB);
+				}
+			} catch(Exception ex) {
+				if (ex instanceof TTransportException) {
+					// if re-connection success, retry command
+					if (this.reconnectWithAuroraScheduler()) {
+						continue;
+					}
+				}
+				logger.error(ex.getMessage(), ex);
+				throw ex;
 			}
-		} catch(Exception ex) {
-			logger.error(ex.getMessage(), ex);
-			throw ex;
 		}
+		
 		return response;
 	}
 	
@@ -130,15 +211,24 @@ public class AuroraThriftClient {
 	 */
 	public ResponseBean killTasks(JobKeyBean jobKeyBean, Set<Integer> instances) throws Exception {
 		ResponseBean response = null;
-		try {
-			if(jobKeyBean != null) {
-				JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-				Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
-				response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS);
+		// try till we get response or scheduler connection not found
+		while(response == null) {
+			try {
+				if(jobKeyBean != null) {
+					JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+					Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
+					response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS);
+				}
+			} catch(Exception ex) {
+				if (ex instanceof TTransportException) {
+					// if re-connection success, retry command
+					if (this.reconnectWithAuroraScheduler()) {
+						continue;
+					}
+				}
+				logger.error(ex.getMessage(), ex);
+				throw ex;
 			}
-		} catch(Exception ex) {
-			logger.error(ex.getMessage(), ex);
-			throw ex;
 		}
 		return response;
 	}
@@ -152,12 +242,21 @@ public class AuroraThriftClient {
 	 */
 	public GetJobsResponseBean getJobList(String ownerRole) throws Exception {
 		GetJobsResponseBean response = null;
-		try {
-				Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole);
-				response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS);
-		} catch(Exception ex) {
-			logger.error(ex.getMessage(), ex);
-			throw ex;
+		// try till we get response or scheduler connection not found
+		while(response == null) {
+			try {
+					Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole);
+					response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS);
+			} catch(Exception ex) {
+				if (ex instanceof TTransportException) {
+					// if re-connection success, retry command
+					if (this.reconnectWithAuroraScheduler()) {
+						continue;
+					}
+				}
+				logger.error(ex.getMessage(), ex);
+				throw ex;
+			}
 		}
 		return response;
 	}
@@ -171,19 +270,28 @@ public class AuroraThriftClient {
 	 */
 	public PendingJobReasonBean getPendingReasonForJob(JobKeyBean jobKeyBean) throws Exception {
 		PendingJobReasonBean response = null;
-		try {
-				JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-				Set<JobKey> jobKeySet = new HashSet<>();
-				jobKeySet.add(jobKey);
-				
-				TaskQuery query = new TaskQuery();
-				query.setJobKeys(jobKeySet);
-				
-				Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query);
-				response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON);
-		} catch(Exception ex) {
-			logger.error(ex.getMessage(), ex);
-			throw ex;
+		// try till we get response or scheduler connection not found
+		while(response == null) {
+			try {
+					JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+					Set<JobKey> jobKeySet = new HashSet<>();
+					jobKeySet.add(jobKey);
+					
+					TaskQuery query = new TaskQuery();
+					query.setJobKeys(jobKeySet);
+					
+					Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query);
+					response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON);
+			} catch(Exception ex) {
+				if (ex instanceof TTransportException) {
+					// if re-connection success, retry command
+					if (this.reconnectWithAuroraScheduler()) {
+						continue;
+					}
+				}
+				logger.error(ex.getMessage(), ex);
+				throw ex;
+			}
 		}
 		return response;
 	}
@@ -197,21 +305,30 @@ public class AuroraThriftClient {
 	 */
 	public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
 		JobDetailsResponseBean response = null;
-		try {
-			if(jobKeyBean != null) {
-				JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-				Set<JobKey> jobKeySet = new HashSet<>();
-				jobKeySet.add(jobKey);
-				
-				TaskQuery query = new TaskQuery();
-				query.setJobKeys(jobKeySet);
-				
-				Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query);
-				response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS);
+		// try till we get response or scheduler connection not found
+		while(response == null) {
+			try {
+				if(jobKeyBean != null) {
+					JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+					Set<JobKey> jobKeySet = new HashSet<>();
+					jobKeySet.add(jobKey);
+					
+					TaskQuery query = new TaskQuery();
+					query.setJobKeys(jobKeySet);
+					
+					Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query);
+					response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS);
+				}
+			} catch(Exception ex) {
+				if (ex instanceof TTransportException) {
+					// if re-connection success, retry command
+					if (this.reconnectWithAuroraScheduler()) {
+						continue;
+					}
+				}
+				logger.error(ex.getMessage(), ex);
+				throw ex;
 			}
-		} catch(Exception ex) {
-			logger.error(ex.getMessage(), ex);
-			throw ex;
 		}
 		return response;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
index 7cb03b4..c13ef8f 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
@@ -390,7 +390,7 @@ public class AuroraThriftClientUtil {
 			// host is reachable
 			isReachable = true;
 		} catch(Exception ex) {
-			logger.error("Timed-out connecting to URL: " + connectionUrl, ex);
+			logger.error("Timed-out connecting to URL: " + connectionUrl);
 		}
 		return isReachable;
 	}