You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/11 21:12:53 UTC
[49/50] [abbrv] airavata git commit: merge changes of master -
AIRAVATA-1511
merge changes of master - AIRAVATA-1511
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/65ad5860
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/65ad5860
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/65ad5860
Branch: refs/heads/master
Commit: 65ad58606fdd4d4a8a1aa5c11a0bc7bdc1f6ac9c
Parents: 3693892 f7de359
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Tue Nov 11 11:31:18 2014 -0500
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Tue Nov 11 11:31:18 2014 -0500
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 21 +-
.../tools/RegisterSampleApplications.java | 9 +-
.../resources/schemas/GFacParameterTypes.xsd | 2 +-
.../java/src/main/assembly/bin-assembly.xml | 280 ++++++++++---------
.../server/src/main/assembly/bin-assembly.xml | 25 +-
.../gfac/bes/provider/impl/BESProvider.java | 206 ++++++++++++++
.../bes/security/UNICORESecurityContext.java | 6 +-
.../gfac/bes/utils/DataTransferrer.java | 53 +++-
.../airavata/gfac/bes/utils/JSDLUtils.java | 6 +-
.../gfac/bes/utils/UASDataStagingProcessor.java | 73 ++---
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 61 ++--
.../airavata/gfac/monitor/util/CommonUtils.java | 7 +-
.../registry/jpa/impl/ExperimentRegistry.java | 7 +-
13 files changed, 525 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 2d7768b,05395e6..dbb4a0c
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@@ -56,10 -55,11 +58,10 @@@ public class CreateLaunchExperiment
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_1869465f-f002-43a9-b243-c091f63ab059";
- private static String wrfAppId = "WRF_a458df70-6808-4d5d-ae32-c49082f2a6cc";
- private static String amberAppId = "Amber_1b99f73b-a88d-44e3-b04e-4f56ba95ed6f";
+ private static String echoAppId = "Echo_636b4530-6fb2-4c9e-998a-b41e648aa70f";
+ private static String wrfAppId = "WRF_d41bdc86-e280-4eb6-a045-708f69a8c116";
+ private static String amberAppId = "Amber_b23ee051-90d6-4892-827e-622a2f6c95ee";
-
private static String localHost = "localhost";
private static String trestlesHostName = "trestles.sdsc.xsede.org";
private static String unicoreHostName = "fsd-cloud15.zam.kfa-juelich.de";
@@@ -213,12 -213,17 +215,17 @@@
input.setType(DataType.STRING);
input.setValue("Echoed_Output=Hello World");
exInputs.add(input);
- DataObjectType i2 = new DataObjectType();
- i2.setKey("Input_to_Echo1");
++ InputDataObjectType i2 = new InputDataObjectType();
++ i2.setName("Input_to_Echo1");
+ i2.setType(DataType.URI);
+ i2.setValue("http://shrib.com/22QmrrX4");
+ exInputs.add(i2);
- List<DataObjectType> exOut = new ArrayList<DataObjectType>();
- DataObjectType output = new DataObjectType();
- output.setKey("Echoed_Output");
+ List<OutputDataObjectType> exOut = new ArrayList<OutputDataObjectType>();
+ OutputDataObjectType output = new OutputDataObjectType();
+ output.setName("Echoed_Output");
output.setType(DataType.STRING);
- output.setValue("");
+ output.setValue("22QmrrX4");
exOut.add(output);
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 398f05c,044ffa2..964e6d1
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@@ -105,165 -101,209 +105,371 @@@ public class BESProvider extends Abstra
public void execute(JobExecutionContext jobExecutionContext)
throws GFacProviderException, GFacException {
++<<<<<<< HEAD
+ StorageClient sc = null;
+ try {
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol();
+ String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId();
+ String factoryUrl = null;
+ if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+ UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+ }
+ EndpointReferenceType eprt = EndpointReferenceType.Factory
+ .newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+ String userDN = getUserName(jobExecutionContext);
+
+ // TODO: to be removed
+ if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+ userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+ CreateActivityDocument cad = CreateActivityDocument.Factory
+ .newInstance();
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties,
+ factoryUrl, 5, null);
+ sc = storageCreator.createStorage();
+
+ JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(
+ jobExecutionContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument()
+ .setJobDefinition(jobDefinition);
+ log.info("JSDL" + jobDefDoc.toString());
+
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+ dt.uploadLocalFiles();
+
+ JobDetails jobDetails = new JobDetails();
+ FactoryClient factory = new FactoryClient(eprt, secProperties);
+
+ log.info(String.format("Activity Submitting to %s ... \n",
+ factoryUrl));
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ CreateActivityResponseDocument response = factory.createActivity(cad);
+ log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis())
+ .toString();
+ }
+ log.info("JobID: " + jobId);
+ jobDetails.setJobID(activityEpr.toString());
+ jobDetails.setJobDescription(activityEpr.toString());
+
+ jobExecutionContext.setJobDetails(jobDetails);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
+
+ factory.getActivityStatus(activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ // TODO publish the status messages to the message bus
+ while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+ ActivityStatusType activityStatus = getStatus(factory, activityEpr);
+ JobState applicationJobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
+ applicationJobStatus);
+
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+
+ // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+ // applicationJobStatus);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ ActivityStatusType activityStatus = null;
+ activityStatus = getStatus(factory, activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+ ActivityClient activityClient;
+ activityClient = new ActivityClient(activityEpr, secProperties);
+ dt.setStorageClient(activityClient.getUspaceClient());
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode()
+ .getLocalPart()
+ + "\n"
+ + activityStatus.getFault().getFaultstring()
+ + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.info(error);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ dt.downloadStdOuts();
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ JobState applicationJobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+ GFacUtils.updateJobStatus(jobExecutionContext, jobDetails,
+ applicationJobStatus);
+ throw new GFacProviderException(
+ jobExecutionContext.getExperimentID() + "Job Canceled");
+ } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (activityStatus.getExitCode() == 0) {
+ dt.downloadRemoteFiles();
+ } else {
+ dt.downloadStdOuts();
+ }
+ }
+ } catch (AppCatalogException e) {
+ log.error("Error while retrieving UNICORE job submission..");
+ throw new GFacProviderException("Error while retrieving UNICORE job submission..", e);
+ } catch (Exception e) {
+ log.error("Cannot create storage..");
+ throw new GFacProviderException("Cannot create storage..", e);
+ } finally {
+ // destroy sms instance
+ try {
+ if (sc != null) {
+ sc.destroy();
+ }
+ } catch (Exception e) {
+ log.warn(
+ "Cannot destroy temporary SMS instance:" + sc.getUrl(),
+ e);
+ }
+ }
+
+ }
++=======
+ UnicoreHostType host = (UnicoreHostType) jobExecutionContext
+ .getApplicationContext().getHostDescription().getType();
+
+ String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+
+ EndpointReferenceType eprt = EndpointReferenceType.Factory
+ .newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+
+ // WSUtilities.addServerIdentity(eprt, serverDN);
+
+ String userDN = getUserName(jobExecutionContext);
+
+ // TODO: to be removed
+ if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+ userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ StorageClient sc = null;
+
+ try {
+
+ CreateActivityDocument cad = CreateActivityDocument.Factory
+ .newInstance();
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+
+ // String xlogin = getCNFromUserDN(userDN);
+
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties,
+ factoryUrl, 5, null);
+
+ try {
+ sc = storageCreator.createStorage();
+ } catch (Exception e2) {
+ log.error("Cannot create storage..");
+ throw new GFacProviderException("Cannot create storage..", e2);
+ }
+
+ JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
+ try {
+ jobDefinition = JSDLGenerator.buildJSDLInstance(
+ jobExecutionContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument()
+ .setJobDefinition(jobDefinition);
+ log.info("JSDL" + jobDefDoc.toString());
+ } catch (Exception e1) {
+ throw new GFacProviderException(
+ "Cannot generate JSDL instance from the JobExecutionContext.",
+ e1);
+ }
+
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+ dt.uploadLocalFiles();
+
+ FactoryClient factory = null;
+ JobDetails jobDetails = new JobDetails();
+
+ try {
+ factory = new FactoryClient(eprt, secProperties);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ CreateActivityResponseDocument response = null;
+ try {
+ log.info(String.format("Activity Submitting to %s ... \n",
+ factoryUrl));
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ response = factory.createActivity(cad);
+ log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+ } catch (Exception e) {
+ throw new GFacProviderException("Cannot create activity.", e);
+ }
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis())
+ .toString();
+ }
+ log.info("JobID: " + jobId);
+ jobDetails.setJobID(jobId);
+ jobDetails.setJobDescription(jobId);
+
+ jobExecutionContext.setJobDetails(jobDetails);
+ try {
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+ // GFacUtils.saveJobStatus(jobExecutionContext, details,JobState.SUBMITTED);
+
+ factory.getActivityStatus(activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress()
+ .getStringValue(), factory.getActivityStatus(activityEpr)
+ .toString()));
+
+ // TODO publish the status messages to the message bus
+ while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ JobState applicationJobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ //TODO: properly use GFacUtils..
+ // GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
+
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+
+ // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+ // applicationJobStatus);
+ } catch (UnknownActivityIdentifierFault e) {
+ throw new GFacProviderException(e.getMessage(),
+ e.getCause());
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+ }catch(Exception e) {
+ throw new GFacProviderException(e.getMessage(),
+ e.getCause());
+
+ }
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+ ActivityClient activityClient;
+ activityClient = new ActivityClient(activityEpr,secProperties);
+ dt.setStorageClient(activityClient.getUspaceClient());
+ } catch (Exception e1) {
+ throw new GFacProviderException(e1.getMessage(),
+ e1.getCause());
+ }
+
+
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode()
+ .getLocalPart()
+ + "\n"
+ + activityStatus.getFault().getFaultstring()
+ + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.info(error);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ dt.downloadStdOuts();
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ JobState applicationJobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is "
+ + applicationJobStatus;
+ jobExecutionContext.getNotifier().publish(
+ new StatusChangeEvent(jobStatusMessage));
+ //TODO: properly use GFacUtils..
+ // GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
+ throw new GFacProviderException(
+ jobExecutionContext.getExperimentID() + "Job Canceled");
+ }
+
+ else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (activityStatus.getExitCode() == 0) {
+ dt.downloadRemoteFiles();
+ } else {
+ dt.downloadStdOuts();
+ }
+ }
+
+ } finally {
+ // destroy sms instance
+ try {
+ if (sc != null) {
+ sc.destroy();
+ }
+ } catch (Exception e) {
+ log.warn(
+ "Cannot destroy temporary SMS instance:" + sc.getUrl(),
+ e);
+ }
+ }
+
+ }
++>>>>>>> f7de359dcae3694912248e50a1a2fd5e30fc613e
private JobState getApplicationJobStatus(ActivityStatusType activityStatus) {
if (activityStatus == null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 25113fd,66cc5f7..171ca07
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@@ -156,9 -158,11 +156,10 @@@ public class HPCPullMonitor extends Pul
try {
take = this.queue.take();
List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
- for (HostMonitorData iHostMonitorData : hostMonitorData) {
+ for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) {
+ HostMonitorData iHostMonitorData = hostIterator.next();
- if (iHostMonitorData.getHost().getType() instanceof GsisshHostType
- || iHostMonitorData.getHost().getType() instanceof SSHHostType) {
- String hostName = iHostMonitorData.getHost().getType().getHostAddress();
+ if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
ResourceConnection connection = null;
if (connections.containsKey(hostName)) {
if (!connections.get(hostName).isConnected()) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/65ad5860/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
----------------------------------------------------------------------
diff --cc modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
index a83f5f1,8e9ae58..edbf39e
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ExperimentRegistry.java
@@@ -93,11 -91,12 +93,12 @@@ public class ExperimentRegistry
addUserConfigData(userConfigurationData, experimentID);
}
- List<DataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
if (experimentOutputs != null && !experimentOutputs.isEmpty()){
- for (OutputDataObjectType output : experimentOutputs){
- output.setValue("");
- }
+ //TODO: short change.
+ // for (DataObjectType output : experimentOutputs){
+ // output.setValue("");
+ // }
addExpOutputs(experimentOutputs, experimentID);
}