You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/22 22:42:23 UTC
[4/4] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f2b5df44
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f2b5df44
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f2b5df44
Branch: refs/heads/master
Commit: f2b5df44482f3cf8912a0582bc0c5696c8839243
Parents: 5381d59 aed31d3
Author: lahiru <la...@apache.org>
Authored: Tue Apr 22 16:41:57 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Apr 22 16:41:57 2014 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 17 ++-
.../api/client/AiravataClientFactory.java | 6 +-
.../error/AiravataClientConnectException.java | 32 ++++++
.../workspace/experiment/ExperimentState.java | 17 +--
.../model/workspace/experiment/JobState.java | 21 ++--
.../model/workspace/experiment/TaskState.java | 17 +--
.../workspace/experiment/TransferState.java | 21 ++--
.../workspace/experiment/WorkflowNodeState.java | 17 +--
.../experimentModel.thrift | 5 +
.../main/resources/airavata-server.properties | 2 +-
.../java/src/main/assembly/bin-assembly.xml | 2 +-
modules/gfac/gfac-core/pom.xml | 7 +-
.../org/apache/airavata/gfac/Scheduler.java | 2 -
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 66 +++++++++--
.../airavata/gfac/provider/GFacProvider.java | 12 +-
.../gfac/provider/impl/AbstractProvider.java | 51 ++++++++-
.../gfac/provider/impl/BESProvider.java | 2 +-
.../gfac/provider/impl/GSISSHProvider.java | 17 +--
.../gfac/provider/impl/GramProvider.java | 3 +-
.../gfac/provider/impl/HadoopProvider.java | 4 +-
.../gfac/provider/impl/LocalProvider.java | 9 +-
modules/gfac/gfac-ec2/pom.xml | 6 +-
.../apache/airavata/gfac/ec2/EC2Provider.java | 54 ++++-----
.../job/monitor/AbstractActivityListener.java | 27 +++++
.../monitor/AbstractActivityMonitorClient.java | 27 -----
.../AiravataExperimentStatusUpdator.java | 82 +++++++++++++
.../job/monitor/AiravataJobStatusUpdator.java | 50 +++++++-
.../job/monitor/AiravataTaskStatusUpdator.java | 114 +++++++++++++++++++
.../AiravataWorkflowNodeStatusUpdator.java | 86 ++++++++++++++
.../airavata/job/monitor/MonitorManager.java | 56 +++++----
.../command/ExperimentCancelRequest.java | 38 +++++++
.../job/monitor/command/TaskCancelRequest.java | 52 +++++++++
.../QstatMonitorTestWithMyProxyAuth.java | 4 +-
.../gfac/provider/impl/SSHProvider.java | 3 +-
.../airavata/integration/SimpleEchoIT.java | 12 +-
.../SingleAppIntegrationTestBase.java | 71 +++++-------
.../WorkflowIntegrationTestBase.java | 29 +++--
.../orchestrator/server/OrchestratorServer.java | 3 -
.../server/OrchestratorServerHandler.java | 8 +-
.../core/context/OrchestratorContext.java | 18 ++-
.../core/impl/EmbeddedGFACJobSubmitter.java | 9 +-
.../airavata/orchestrator/cpi/Orchestrator.java | 10 +-
.../cpi/impl/AbstractOrchestrator.java | 2 -
.../cpi/impl/SimpleOrchestratorImpl.java | 37 ++++--
.../registry/jpa/impl/ExperimentRegistry.java | 10 +-
.../jpa/resources/TaskDetailResource.java | 3 +
.../airavata/api/samples/ExperimentSample.java | 5 +-
.../apache/airavata/job/monitor/MonitorID.java | 28 +++--
.../job/monitor/core/MessageParser.java | 2 +-
.../job/monitor/event/MonitorPublisher.java | 20 ++--
.../job/monitor/impl/LocalJobMonitor.java | 4 +-
.../monitor/impl/pull/qstat/QstatMonitor.java | 4 +-
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 4 +-
.../monitor/impl/push/amqp/BasicConsumer.java | 2 +-
.../impl/push/amqp/JSONMessageParser.java | 2 +-
.../impl/push/amqp/UnRegisterWorker.java | 4 +-
.../state/AbstractStateChangeRequest.java | 37 ++++++
.../state/ExperimentStatusChangeRequest.java | 55 +++++++++
.../airavata/job/monitor/state/JobStatus.java | 67 -----------
.../monitor/state/JobStatusChangeRequest.java | 56 +++++++++
.../job/monitor/state/PublisherMessage.java | 26 +++++
.../monitor/state/TaskStatusChangeRequest.java | 53 +++++++++
.../apache/airavata/job/AMQPMonitorTest.java | 4 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 6 +-
64 files changed, 1142 insertions(+), 378 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 5914dd3,0000000..12e2ad1
mode 100644,000000..100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@@ -1,262 -1,0 +1,261 @@@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.provider.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
- import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Execute application using remote SSH
+ */
- public class SSHProvider extends AbstractProvider implements GFacProvider{
++public class SSHProvider extends AbstractProvider{
+ private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+ private Cluster cluster;
+ private String jobID = null;
+ private String taskID = null;
+ // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
+ private GSISSHProvider gsiSshProvider = null;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobID(taskID);
+ details.setJobDescription(remoteFile);
+ jobExecutionContext.setJobDetails(details);
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
+ details.setJobDescription(jobDescriptor.toXML());
+
+ GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
+ log.info(remoteFile);
+ try {
+ File runscript = createShellScript(jobExecutionContext);
+ cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }else{
+ gsiSshProvider = new GSISSHProvider();
+ }
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (gsiSshProvider == null) {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ /*
+ * Execute
+ */
+ String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobDescription(execuable);
+
+// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
+ RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
+
+ StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+
+ CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+ String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+ log.info("stdout=" + stdOutputString);
+
+// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } finally {
+ if (cluster != null) {
+ try {
+ cluster.disconnect();
+ } catch (SSHApiException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ try {
+ gsiSshProvider.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (gsiSshProvider != null){
+ try {
+ gsiSshProvider.dispose(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(),e);
+ }
+ }
+ }
+
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private File createShellScript(JobExecutionContext context) throws IOException {
+ ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+ + new Random().nextLong();
+
+ File shellScript = File.createTempFile(uniqueDir, "sh");
+ OutputStream out = new FileOutputStream(shellScript);
+
+ out.write("#!/bin/bash\n".getBytes());
+ out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+ .getBytes());
+ // get the env of the host and the application
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ Map<String, String> nv = new HashMap<String, String>();
+ if (env != null) {
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+ }
+ for (Entry<String, String> entry : nv.entrySet()) {
+ log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+ out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+ }
+
+ // prepare the command
+ final String SPACE = " ";
+ StringBuffer cmd = new StringBuffer();
+ cmd.append(app.getExecutableLocation());
+ cmd.append(SPACE);
+
+ MessageContext input = context.getInMessageContext();
+ ;
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ cmd.append(value);
+ cmd.append(SPACE);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ cmd.append(paramValue);
+ cmd.append(SPACE);
+ }
+ }
+ // We redirect the error and stdout to remote files, they will be read
+ // in later
+ cmd.append(SPACE);
+ cmd.append("1>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardOutput());
+ cmd.append(SPACE);
+ cmd.append("2>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardError());
+
+ String cmdStr = cmd.toString();
+ log.info("Command = " + cmdStr);
+ out.write((cmdStr + "\n").getBytes());
+ String message = "\"execuationSuceeded\"";
+ out.write(("echo " + message + "\n").getBytes());
+ out.close();
+
+ return shellScript;
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+ if (gsiSshProvider != null){
+ try {
+ initProperties(properties);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(),e);
+ }
+ }
+ }
+ /**
+ * This method will read standard output and if there's any it will be parsed
+ * @param jobIDReaderCommandOutput
+ * @param errorMsg
+ * @return
+ * @throws SSHApiException
+ */
+ private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+ String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+ String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+ if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+ log.error("Standard Error output : " + stdErrorString);
+ throw new SSHApiException(errorMsg + stdErrorString);
+ }
+ return stdOutputString;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------