You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/11 21:12:20 UTC
[16/50] [abbrv] airavata git commit: adding BES provider changes
adding BES provider changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f953e02
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f953e02
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f953e02
Branch: refs/heads/master
Commit: 3f953e026ab6a5341fe762f6a98fc6807d67ca29
Parents: eb626fa
Author: chathuriw <ka...@gmail.com>
Authored: Fri Oct 31 14:40:50 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Wed Nov 5 11:23:05 2014 -0500
----------------------------------------------------------------------
.../gfac/bes/handlers/AbstractSMSHandler.java | 74 ++--
.../gfac/bes/provider/impl/BESProvider.java | 378 +++++++++----------
.../bes/security/UNICORESecurityContext.java | 4 +-
.../gfac/bes/utils/ApplicationProcessor.java | 212 ++++-------
.../airavata/gfac/core/utils/GFacUtils.java | 23 +-
.../apache/airavata/gfac/ec2/EC2Provider.java | 15 +-
6 files changed, 306 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java
index 8f6fcf4..71ca0db 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java
@@ -2,6 +2,7 @@ package org.apache.airavata.gfac.bes.handlers;
import java.util.Properties;
+import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.bes.security.UNICORESecurityContext;
import org.apache.airavata.gfac.bes.security.X509SecurityContext;
@@ -13,6 +14,7 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.schemas.gfac.JobDirectoryModeDocument.JobDirectoryMode;
@@ -43,42 +45,42 @@ public abstract class AbstractSMSHandler implements BESConstants, GFacHandler{
@Override
public void invoke(JobExecutionContext jobExecutionContext)
throws GFacHandlerException {
-
- // if not SMS then not to pass further
-// if(!isSMSEnabled(jobExecutionContext)) return;
-
- initSecurityProperties(jobExecutionContext);
-
+ try {
+ initSecurityProperties(jobExecutionContext);
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol();
+ String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId();
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
+ storageClient = null;
-
- UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
- .getType();
- String factoryUrl = host.getUnicoreBESEndPointArray()[0];
-
- storageClient = null;
-
- if(!isSMSInstanceExisting(jobExecutionContext)) {
- EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
- eprt.addNewAddress().setStringValue(factoryUrl);
- StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null);
- try {
- storageClient = storageCreator.createStorage();
- } catch (Exception e2) {
- log.error("Cannot create storage..");
- throw new GFacHandlerException("Cannot create storage..", e2);
+ if (!isSMSInstanceExisting(jobExecutionContext)) {
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+ StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null);
+ try {
+ storageClient = storageCreator.createStorage();
+ } catch (Exception e2) {
+ log.error("Cannot create storage..");
+ throw new GFacHandlerException("Cannot create storage..", e2);
+ }
+ jobExecutionContext.setProperty(PROP_SMS_EPR, storageClient.getEPR());
+ } else {
+ EndpointReferenceType eprt = (EndpointReferenceType) jobExecutionContext.getProperty(PROP_SMS_EPR);
+ try {
+ storageClient = new StorageClient(eprt, secProperties);
+ } catch (Exception e) {
+ throw new GFacHandlerException("Cannot create storage..", e);
+ }
}
- jobExecutionContext.setProperty(PROP_SMS_EPR, storageClient.getEPR());
- }
- else {
- EndpointReferenceType eprt = (EndpointReferenceType)jobExecutionContext.getProperty(PROP_SMS_EPR);
- try {
- storageClient = new StorageClient(eprt, secProperties);
- } catch (Exception e) {
- throw new GFacHandlerException("Cannot create storage..", e);
- }
+ dataTransferrer = new DataTransferrer(jobExecutionContext, storageClient);
+ } catch (AppCatalogException e) {
+ throw new GFacHandlerException("Error occurred while retrieving unicore job submission interface..", e);
}
- dataTransferrer = new DataTransferrer(jobExecutionContext, storageClient);
- }
+ }
protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
log.debug("Initializing SMSInHandler security properties ..");
@@ -136,9 +138,9 @@ public abstract class AbstractSMSHandler implements BESConstants, GFacHandler{
* of the job execution context.
* */
protected boolean isSMSEnabled(JobExecutionContext jobExecutionContext){
- if(((UnicoreHostType)jobExecutionContext.getApplicationContext().getHostDescription().getType()).getJobDirectoryMode() == JobDirectoryMode.SMS_BYTE_IO) {
- return true;
- }
+// if(((UnicoreHostType)jobExecutionContext.getApplicationContext().getHostDescription().getType()).getJobDirectoryMode() == JobDirectoryMode.SMS_BYTE_IO) {
+// return true;
+// }
return false;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 7ed038a..398f05c 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.bes.provider.impl;
import java.util.Calendar;
import java.util.Map;
+import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.bes.security.UNICORESecurityContext;
@@ -40,6 +41,9 @@ import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.UnicoreHostType;
@@ -101,209 +105,165 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
public void execute(JobExecutionContext jobExecutionContext)
throws GFacProviderException, GFacException {
- UnicoreHostType host = (UnicoreHostType) jobExecutionContext
- .getApplicationContext().getHostDescription().getType();
-
- String factoryUrl = host.getUnicoreBESEndPointArray()[0];
-
- EndpointReferenceType eprt = EndpointReferenceType.Factory
- .newInstance();
- eprt.addNewAddress().setStringValue(factoryUrl);
-
- // WSUtilities.addServerIdentity(eprt, serverDN);
-
- String userDN = getUserName(jobExecutionContext);
-
- // TODO: to be removed
- if (userDN == null || userDN.equalsIgnoreCase("admin")) {
- userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
- }
-
- StorageClient sc = null;
-
- try {
-
- CreateActivityDocument cad = CreateActivityDocument.Factory
- .newInstance();
- JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
- .newInstance();
-
-// String xlogin = getCNFromUserDN(userDN);
-
- // create storage
- StorageCreator storageCreator = new StorageCreator(secProperties,
- factoryUrl, 5, null);
-
- try {
- sc = storageCreator.createStorage();
- } catch (Exception e2) {
- log.error("Cannot create storage..");
- throw new GFacProviderException("Cannot create storage..", e2);
- }
-
- JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
- try {
- jobDefinition = JSDLGenerator.buildJSDLInstance(
- jobExecutionContext, sc.getUrl()).getJobDefinition();
- cad.addNewCreateActivity().addNewActivityDocument()
- .setJobDefinition(jobDefinition);
- log.info("JSDL" + jobDefDoc.toString());
- } catch (Exception e1) {
- throw new GFacProviderException(
- "Cannot generate JSDL instance from the JobExecutionContext.",
- e1);
- }
-
- // upload files if any
- DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
- dt.uploadLocalFiles();
-
- FactoryClient factory = null;
- JobDetails jobDetails = new JobDetails();
-
- try {
- factory = new FactoryClient(eprt, secProperties);
- } catch (Exception e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
- CreateActivityResponseDocument response = null;
- try {
- log.info(String.format("Activity Submitting to %s ... \n",
- factoryUrl));
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- response = factory.createActivity(cad);
- log.info(String.format("Activity Submitted to %s \n", factoryUrl));
- } catch (Exception e) {
- throw new GFacProviderException("Cannot create activity.", e);
- }
- EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
-
- log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
-
- // factory.waitWhileActivityIsDone(activityEpr, 1000);
- jobId = WSUtilities.extractResourceID(activityEpr);
- if (jobId == null) {
- jobId = new Long(Calendar.getInstance().getTimeInMillis())
- .toString();
- }
- log.info("JobID: " + jobId);
- jobDetails.setJobID(activityEpr.toString());
- jobDetails.setJobDescription(activityEpr.toString());
-
- jobExecutionContext.setJobDetails(jobDetails);
- try {
- log.info(formatStatusMessage(activityEpr.getAddress()
- .getStringValue(), factory.getActivityStatus(activityEpr)
- .toString()));
-
- jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
- GFacUtils.saveJobStatus(jobExecutionContext, details,JobState.SUBMITTED);
-
- factory.getActivityStatus(activityEpr);
- log.info(formatStatusMessage(activityEpr.getAddress()
- .getStringValue(), factory.getActivityStatus(activityEpr)
- .toString()));
-
- // TODO publish the status messages to the message bus
- while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
- && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
- && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
-
- ActivityStatusType activityStatus = null;
- try {
- activityStatus = getStatus(factory, activityEpr);
- JobState applicationJobStatus = getApplicationJobStatus(activityStatus);
- String jobStatusMessage = "Status of job " + jobId + "is "
- + applicationJobStatus;
- GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
- applicationJobStatus);
-
- jobExecutionContext.getNotifier().publish(
- new StatusChangeEvent(jobStatusMessage));
-
- // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
- // applicationJobStatus);
- } catch (UnknownActivityIdentifierFault e) {
- throw new GFacProviderException(e.getMessage(),
- e.getCause());
- }
-
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- continue;
- }
- }catch(Exception e) {
- throw new GFacProviderException(e.getMessage(),
- e.getCause());
-
- }
-
- ActivityStatusType activityStatus = null;
- try {
- activityStatus = getStatus(factory, activityEpr);
- log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
- ActivityClient activityClient;
- activityClient = new ActivityClient(activityEpr,secProperties);
- dt.setStorageClient(activityClient.getUspaceClient());
- } catch (Exception e1) {
- throw new GFacProviderException(e1.getMessage(),
- e1.getCause());
- }
-
-
-
- if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
- String error = activityStatus.getFault().getFaultcode()
- .getLocalPart()
- + "\n"
- + activityStatus.getFault().getFaultstring()
- + "\n EXITCODE: " + activityStatus.getExitCode();
- log.info(error);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- dt.downloadStdOuts();
- } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
- JobState applicationJobStatus = JobState.CANCELED;
- String jobStatusMessage = "Status of job " + jobId + "is "
- + applicationJobStatus;
- jobExecutionContext.getNotifier().publish(
- new StatusChangeEvent(jobStatusMessage));
- GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
- applicationJobStatus);
- throw new GFacProviderException(
- jobExecutionContext.getExperimentID() + "Job Canceled");
- }
-
- else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- if (activityStatus.getExitCode() == 0) {
- dt.downloadRemoteFiles();
- } else {
- dt.downloadStdOuts();
- }
- }
-
- } finally {
- // destroy sms instance
- try {
- if (sc != null) {
- sc.destroy();
- }
- } catch (Exception e) {
- log.warn(
- "Cannot destroy temporary SMS instance:" + sc.getUrl(),
- e);
- }
- }
-
- }
+ StorageClient sc = null;
+ try {
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol();
+ String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId();
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
+ EndpointReferenceType eprt = EndpointReferenceType.Factory
+ .newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+ String userDN = getUserName(jobExecutionContext);
+
+ // TODO: to be removed
+ if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+ userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+ CreateActivityDocument cad = CreateActivityDocument.Factory
+ .newInstance();
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties,
+ factoryUrl, 5, null);
+ sc = storageCreator.createStorage();
+
+ JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(
+ jobExecutionContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument()
+ .setJobDefinition(jobDefinition);
+ log.info("JSDL" + jobDefDoc.toString());
+
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+ dt.uploadLocalFiles();
+
+ JobDetails jobDetails = new JobDetails();
+ FactoryClient factory = new FactoryClient(eprt, secProperties);
+
+ log.info(String.format("Activity Submitting to %s ... \n",
+ factoryUrl));
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ CreateActivityResponseDocument response = factory.createActivity(cad);
+ log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis())
+ .toString();
+ }
+ log.info("JobID: " + jobId);
+ jobDetails.setJobID(activityEpr.toString());
+ jobDetails.setJobDescription(activityEpr.toString());
+
+ jobExecutionContext.setJobDetails(jobDetails);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
+
+ factory.getActivityStatus(activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ // TODO publish the status messages to the message bus
+ while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+ ActivityStatusType activityStatus = getStatus(factory, activityEpr);
+ JobState applicationJobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
+ applicationJobStatus);
+
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+
+ // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+ // applicationJobStatus);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ ActivityStatusType activityStatus = null;
+ activityStatus = getStatus(factory, activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+ ActivityClient activityClient;
+ activityClient = new ActivityClient(activityEpr, secProperties);
+ dt.setStorageClient(activityClient.getUspaceClient());
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode()
+ .getLocalPart()
+ + "\n"
+ + activityStatus.getFault().getFaultstring()
+ + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.info(error);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ dt.downloadStdOuts();
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ JobState applicationJobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+ GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
+ applicationJobStatus);
+ throw new GFacProviderException(
+ jobExecutionContext.getExperimentID() + "Job Canceled");
+ } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (activityStatus.getExitCode() == 0) {
+ dt.downloadRemoteFiles();
+ } else {
+ dt.downloadStdOuts();
+ }
+ }
+ } catch (AppCatalogException e) {
+ log.error("Error while retrieving UNICORE job submission..");
+ throw new GFacProviderException("Error while retrieving UNICORE job submission..", e);
+ } catch (Exception e) {
+ log.error("Cannot create storage..");
+ throw new GFacProviderException("Cannot create storage..", e);
+ } finally {
+ // destroy sms instance
+ try {
+ if (sc != null) {
+ sc.destroy();
+ }
+ } catch (Exception e) {
+ log.warn(
+ "Cannot destroy temporary SMS instance:" + sc.getUrl(),
+ e);
+ }
+ }
+
+ }
private JobState getApplicationJobStatus(ActivityStatusType activityStatus) {
if (activityStatus == null) {
@@ -368,10 +328,14 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
// initSecurityProperties(jobExecutionContext);
EndpointReferenceType eprt = EndpointReferenceType.Factory
.parse(activityEpr);
- UnicoreHostType host = (UnicoreHostType) jobExecutionContext
- .getApplicationContext().getHostDescription().getType();
-
- String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol();
+ String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId();
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
EndpointReferenceType epr = EndpointReferenceType.Factory
.newInstance();
epr.addNewAddress().setStringValue(factoryUrl);
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
index 7285c2c..855335f 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
@@ -38,7 +38,7 @@ public class UNICORESecurityContext extends X509SecurityContext {
* @return an instance of the default client configuration
* @throws GFacException
* @throws ApplicationSettingsException
- * @throws GFacProviderException
+ * @throws GFacException, ApplicationSettingsException
*/
public DefaultClientConfiguration getDefaultConfiguration() throws GFacException, ApplicationSettingsException {
try{
@@ -69,7 +69,7 @@ public class UNICORESecurityContext extends X509SecurityContext {
* @param caKeyPath
* @param caKeyPwd
* @return
- * @throws GFacProviderException
+ * @throws GFacException
*/
public DefaultClientConfiguration getServerSignedConfiguration(String userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) throws GFacException {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
index d624340..ee58565 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
@@ -22,21 +22,18 @@
package org.apache.airavata.gfac.bes.utils;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
import org.apache.airavata.schemas.gfac.ExtendedKeyValueType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.JobTypeType;
-import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType;
import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
-import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.EnvironmentType;
import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType;
import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType;
import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType;
import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType;
import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType;
-import java.io.File;
-
public class ApplicationProcessor {
@@ -47,40 +44,50 @@ public class ApplicationProcessor {
userName = "CN=zdv575, O=Ultrascan Gateway, C=DE";
}
- HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) context
- .getApplicationContext().getApplicationDeploymentDescription()
- .getType();
-
- createGenericApplication(value, appDepType);
-
- if (appDepType.getApplicationEnvironmentArray().length > 0) {
- createApplicationEnvironment(value,
- appDepType.getApplicationEnvironmentArray(), appDepType);
- }
+ ApplicationDeploymentDescription appDep= context.getApplicationContext().getApplicationDeploymentDescription();
+ String appname = context.getApplicationContext().getApplicationInterfaceDescription().getApplicationName();
+ ApplicationParallelismType parallelism = appDep.getParallelism();
-
- if (appDepType.getExecutableLocation() != null) {
+ ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+ appType.setApplicationName(appname);
+ JSDLUtils.getOrCreateJobIdentification(value).setJobName(appname);
+
+// if (appDep.getSetEnvironment().size() > 0) {
+// createApplicationEnvironment(value, appDep.getSetEnvironment(), parallelism);
+// }
+//
+ String stdout = context.getStandardOutput();
+ String stderr = context.getStandardError();
+ if (appDep.getExecutablePath() != null) {
FileNameType fNameType = FileNameType.Factory.newInstance();
- fNameType.setStringValue(appDepType.getExecutableLocation());
- if(isParallelJob(appDepType)) {
+ fNameType.setStringValue(appDep.getExecutablePath());
+ if(parallelism.equals(ApplicationParallelismType.MPI) || parallelism.equals(ApplicationParallelismType.OPENMP_MPI)) {
JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType);
- JSDLUtils.getSPMDApplication(value).setSPMDVariation(getSPMDVariation(appDepType));
-
- if(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES)!=null){
+ if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){
+ JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value());
+ }else if (parallelism.equals(ApplicationParallelismType.MPI)){
+ JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value());
+ }
+
+ int totalCPUCount = context.getTaskData().getTaskScheduling().getTotalCPUCount();
+ if(totalCPUCount > 0){
NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance();
- num.setStringValue(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES));
+ num.setStringValue(String.valueOf(totalCPUCount));
JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num);
}
-
- if(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST)!=null){
- ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance();
- pph.setStringValue(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST));
- JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
- }
-
- if(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST)!=null){
+
+ int totalNodeCount = context.getTaskData().getTaskScheduling().getNodeCount();
+ if(totalNodeCount > 0){
+ int ppn = totalCPUCount / totalNodeCount;
+ ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance();
+ pph.setStringValue(String.valueOf(ppn));
+ JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
+ }
+
+ int totalThreadCount = context.getTaskData().getTaskScheduling().getNumberOfThreads();
+ if(totalThreadCount > 0){
ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance();
- tpp.setStringValue(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST));
+ tpp.setStringValue(String.valueOf(totalThreadCount));
JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp);
}
@@ -90,6 +97,18 @@ public class ApplicationProcessor {
userNameType.setStringValue(userName);
JSDLUtils.getSPMDApplication(value).setUserName(userNameType);
}
+ if (stdout != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stdout);
+ JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
+ }
+ if (stderr != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
+ }
+
+
}
else {
JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType);
@@ -98,17 +117,18 @@ public class ApplicationProcessor {
userNameType.setStringValue(userName);
JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType);
}
+ if (stdout != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stdout);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
+ }
+ if (stderr != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
+ }
}
}
-
-
- String stdout = (appDepType.getStandardOutput() != null) ? new File(appDepType.getStandardOutput()).getName(): "stdout";
- ApplicationProcessor.setApplicationStdOut(value, appDepType, stdout);
-
-
- String stderr = (appDepType.getStandardError() != null) ? new File(appDepType.getStandardError()).getName() : "stderr";
- ApplicationProcessor.setApplicationStdErr(value, appDepType, stderr);
-
}
public static String getUserNameFromContext(JobExecutionContext jobContext) {
@@ -117,79 +137,7 @@ public class ApplicationProcessor {
//FIXME: Discuss to get user and change this
return "admin";
}
- public static boolean isParallelJob(HpcApplicationDeploymentType appDepType) {
-
- boolean isParallel = false;
-
- if (appDepType.getJobType() != null) {
- // TODO set data output directory
- int status = appDepType.getJobType().intValue();
-
- switch (status) {
- // TODO: this check should be done outside this class
- case JobTypeType.INT_MPI:
- case JobTypeType.INT_OPEN_MP:
- isParallel = true;
- break;
-
- case JobTypeType.INT_SERIAL:
- case JobTypeType.INT_SINGLE:
- isParallel = false;
- break;
- default:
- isParallel = false;
- break;
- }
- }
- return isParallel;
- }
-
-
- public static void createApplicationEnvironment(JobDefinitionType value, NameValuePairType[] nameValuePairs, HpcApplicationDeploymentType appDepType) {
-
- if(isParallelJob(appDepType)) {
- for (NameValuePairType nv : nameValuePairs) {
- EnvironmentType envType = JSDLUtils.getOrCreateSPMDApplication(value).addNewEnvironment();
- envType.setName(nv.getName());
- envType.setStringValue(nv.getValue());
- }
- }
- else {
- for (NameValuePairType nv : nameValuePairs) {
- EnvironmentType envType = JSDLUtils.getOrCreatePOSIXApplication(value).addNewEnvironment();
- envType.setName(nv.getName());
- envType.setStringValue(nv.getValue());
- }
- }
-
- }
-
-
- public static String getSPMDVariation (HpcApplicationDeploymentType appDepType) {
-
- String variation = null;
-
- if (appDepType.getJobType() != null) {
- // TODO set data output directory
- int status = appDepType.getJobType().intValue();
-
- switch (status) {
- // TODO: this check should be done outside this class
- case JobTypeType.INT_MPI:
- variation = SPMDVariations.MPI.value();
- break;
-
- case JobTypeType.INT_OPEN_MP:
- variation = SPMDVariations.OpenMPI.value();
- break;
-
- }
- }
- return variation;
- }
-
-
public static void addApplicationArgument(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stringPrm) {
if(isParallelJob(appDepType))
JSDLUtils.getOrCreateSPMDApplication(value)
@@ -200,24 +148,6 @@ public class ApplicationProcessor {
}
- public static void setApplicationStdErr(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) {
- FileNameType fName = FileNameType.Factory.newInstance();
- fName.setStringValue(stderr);
- if (isParallelJob(appDepType))
- JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
- else
- JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
- }
-
- public static void setApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) {
- FileNameType fName = FileNameType.Factory.newInstance();
- fName.setStringValue(stderr);
- if (isParallelJob(appDepType))
- JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
- else
- JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
- }
-
public static String getApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType) throws RuntimeException {
if (isParallelJob(appDepType)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue();
else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue();
@@ -228,18 +158,14 @@ public class ApplicationProcessor {
else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue();
}
- public static void createGenericApplication(JobDefinitionType value, HpcApplicationDeploymentType appDepType) {
- if (appDepType.getApplicationName() != null) {
- ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
- String appName = appDepType.getApplicationName()
- .getStringValue();
- appType.setApplicationName(appName);
- JSDLUtils.getOrCreateJobIdentification(value).setJobName(appName);
- }
- }
-
-
- public static String getValueFromMap(HpcApplicationDeploymentType appDepType, String name) {
+ public static void createGenericApplication(JobDefinitionType value, String appName) {
+ ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+ appType.setApplicationName(appName);
+ JSDLUtils.getOrCreateJobIdentification(value).setJobName(appName);
+ }
+
+
+ public static String getValueFromMap(HpcApplicationDeploymentType appDepType, String name) {
ExtendedKeyValueType[] extended = appDepType.getKeyValuePairsArray();
for(ExtendedKeyValueType e: extended) {
if(e.getName().equalsIgnoreCase(name)) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index ff6f2c2..b38808b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -39,7 +39,9 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacPluginState;
+import org.apache.airavata.model.appcatalog.computeresource.GlobusJobSubmission;
import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.model.workspace.experiment.DataType;
@@ -1258,21 +1260,34 @@ public class GFacUtils {
AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
}catch (Exception e){
- String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+ String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
log.error(errorMsg, e);
throw new AppCatalogException(errorMsg, e);
}
}
- public static UnicoreJobSubmission getJobSubmission (String submissionId) throws AppCatalogException{
+ public static GlobusJobSubmission getGlobusJobSubmission (String submissionId) throws AppCatalogException{
+ return null;
+// try {
+// AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+// return appCatalog.getComputeResource().getGlobus(submissionId);
+// }catch (Exception e){
+// String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+// log.error(errorMsg, e);
+// throw new AppCatalogException(errorMsg, e);
+// }
+ }
+
+ public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
try {
AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
- return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+ return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
}catch (Exception e){
- String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
log.error(errorMsg, e);
throw new AppCatalogException(errorMsg, e);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f953e02/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
index 940fff3..5c5af53 100644
--- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
+++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
@@ -38,6 +38,7 @@ import org.apache.airavata.gfac.core.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.ec2.util.AmazonEC2Util;
import org.apache.airavata.gfac.ec2.util.EC2ProviderUtil;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
@@ -90,7 +91,7 @@ public class EC2Provider extends AbstractProvider {
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException{
if (jobExecutionContext != null) {
- jobId="EC2_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
+ jobId="EC2_"+jobExecutionContext.getHostName()+"_"+Calendar.getInstance().getTimeInMillis();
if (jobExecutionContext.getSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT)
instanceof AmazonSecurityContext) {
this.amazonSecurityContext = (AmazonSecurityContext) jobExecutionContext.
@@ -156,10 +157,9 @@ public class EC2Provider extends AbstractProvider {
try
{
String outParamName;
- OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext().
- getServiceDescription().getType().getOutputParametersArray();
- if(outputParametersArray != null) {
- outParamName = outputParametersArray[0].getParameterName();
+ List<OutputDataObjectType> outputs = jobExecutionContext.getApplicationContext().getApplicationInterfaceDescription().getApplicationOutputs();
+ if(outputs != null && !outputs.isEmpty()) {
+ outParamName = outputs.get(0).getName();
} else {
throw new GFacProviderException("Output parameter name is not set. Therefore, not being able " +
"to filter the job result from standard out ");
@@ -217,11 +217,10 @@ public class EC2Provider extends AbstractProvider {
executionResult = executionResult.replace("\r","").replace("\n","");
log.info("Result of the job : " + executionResult);
- for(OutputParameterType outparamType : outputParametersArray){
+ for(OutputDataObjectType outparamType : outputs){
/* Assuming that there is just a single result. If you want to add more results, update the necessary
logic below */
- String paramName = outparamType.getParameterName();
- ActualParameter outParam = new ActualParameter();
+ String paramName = outparamType.getName();
outParam.getType().changeType(StringParameterType.type);
((StringParameterType) outParam.getType()).setValue(executionResult);
jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);