You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/22 22:42:23 UTC

[4/4] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f2b5df44
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f2b5df44
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f2b5df44

Branch: refs/heads/master
Commit: f2b5df44482f3cf8912a0582bc0c5696c8839243
Parents: 5381d59 aed31d3
Author: lahiru <la...@apache.org>
Authored: Tue Apr 22 16:41:57 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Apr 22 16:41:57 2014 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |  17 ++-
 .../api/client/AiravataClientFactory.java       |   6 +-
 .../error/AiravataClientConnectException.java   |  32 ++++++
 .../workspace/experiment/ExperimentState.java   |  17 +--
 .../model/workspace/experiment/JobState.java    |  21 ++--
 .../model/workspace/experiment/TaskState.java   |  17 +--
 .../workspace/experiment/TransferState.java     |  21 ++--
 .../workspace/experiment/WorkflowNodeState.java |  17 +--
 .../experimentModel.thrift                      |   5 +
 .../main/resources/airavata-server.properties   |   2 +-
 .../java/src/main/assembly/bin-assembly.xml     |   2 +-
 modules/gfac/gfac-core/pom.xml                  |   7 +-
 .../org/apache/airavata/gfac/Scheduler.java     |   2 -
 .../org/apache/airavata/gfac/cpi/GFacImpl.java  |  66 +++++++++--
 .../airavata/gfac/provider/GFacProvider.java    |  12 +-
 .../gfac/provider/impl/AbstractProvider.java    |  51 ++++++++-
 .../gfac/provider/impl/BESProvider.java         |   2 +-
 .../gfac/provider/impl/GSISSHProvider.java      |  17 +--
 .../gfac/provider/impl/GramProvider.java        |   3 +-
 .../gfac/provider/impl/HadoopProvider.java      |   4 +-
 .../gfac/provider/impl/LocalProvider.java       |   9 +-
 modules/gfac/gfac-ec2/pom.xml                   |   6 +-
 .../apache/airavata/gfac/ec2/EC2Provider.java   |  54 ++++-----
 .../job/monitor/AbstractActivityListener.java   |  27 +++++
 .../monitor/AbstractActivityMonitorClient.java  |  27 -----
 .../AiravataExperimentStatusUpdator.java        |  82 +++++++++++++
 .../job/monitor/AiravataJobStatusUpdator.java   |  50 +++++++-
 .../job/monitor/AiravataTaskStatusUpdator.java  | 114 +++++++++++++++++++
 .../AiravataWorkflowNodeStatusUpdator.java      |  86 ++++++++++++++
 .../airavata/job/monitor/MonitorManager.java    |  56 +++++----
 .../command/ExperimentCancelRequest.java        |  38 +++++++
 .../job/monitor/command/TaskCancelRequest.java  |  52 +++++++++
 .../QstatMonitorTestWithMyProxyAuth.java        |   4 +-
 .../gfac/provider/impl/SSHProvider.java         |   3 +-
 .../airavata/integration/SimpleEchoIT.java      |  12 +-
 .../SingleAppIntegrationTestBase.java           |  71 +++++-------
 .../WorkflowIntegrationTestBase.java            |  29 +++--
 .../orchestrator/server/OrchestratorServer.java |   3 -
 .../server/OrchestratorServerHandler.java       |   8 +-
 .../core/context/OrchestratorContext.java       |  18 ++-
 .../core/impl/EmbeddedGFACJobSubmitter.java     |   9 +-
 .../airavata/orchestrator/cpi/Orchestrator.java |  10 +-
 .../cpi/impl/AbstractOrchestrator.java          |   2 -
 .../cpi/impl/SimpleOrchestratorImpl.java        |  37 ++++--
 .../registry/jpa/impl/ExperimentRegistry.java   |  10 +-
 .../jpa/resources/TaskDetailResource.java       |   3 +
 .../airavata/api/samples/ExperimentSample.java  |   5 +-
 .../apache/airavata/job/monitor/MonitorID.java  |  28 +++--
 .../job/monitor/core/MessageParser.java         |   2 +-
 .../job/monitor/event/MonitorPublisher.java     |  20 ++--
 .../job/monitor/impl/LocalJobMonitor.java       |   4 +-
 .../monitor/impl/pull/qstat/QstatMonitor.java   |   4 +-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |   4 +-
 .../monitor/impl/push/amqp/BasicConsumer.java   |   2 +-
 .../impl/push/amqp/JSONMessageParser.java       |   2 +-
 .../impl/push/amqp/UnRegisterWorker.java        |   4 +-
 .../state/AbstractStateChangeRequest.java       |  37 ++++++
 .../state/ExperimentStatusChangeRequest.java    |  55 +++++++++
 .../airavata/job/monitor/state/JobStatus.java   |  67 -----------
 .../monitor/state/JobStatusChangeRequest.java   |  56 +++++++++
 .../job/monitor/state/PublisherMessage.java     |  26 +++++
 .../monitor/state/TaskStatusChangeRequest.java  |  53 +++++++++
 .../apache/airavata/job/AMQPMonitorTest.java    |   4 +-
 .../job/QstatMonitorTestWithMyProxyAuth.java    |   6 +-
 64 files changed, 1142 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 5914dd3,0000000..12e2ad1
mode 100644,000000..100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@@ -1,262 -1,0 +1,261 @@@
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +
 +package org.apache.airavata.gfac.provider.impl;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.util.Calendar;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.airavata.commons.gfac.type.ActualParameter;
 +import org.apache.airavata.commons.gfac.type.MappingFactory;
 +import org.apache.airavata.gfac.Constants;
 +import org.apache.airavata.gfac.GFacException;
 +import org.apache.airavata.gfac.context.JobExecutionContext;
 +import org.apache.airavata.gfac.context.MessageContext;
 +import org.apache.airavata.gfac.context.security.SSHSecurityContext;
- import org.apache.airavata.gfac.provider.GFacProvider;
 +import org.apache.airavata.gfac.provider.GFacProviderException;
 +import org.apache.airavata.gfac.utils.GFacUtils;
 +import org.apache.airavata.gsi.ssh.api.Cluster;
 +import org.apache.airavata.gsi.ssh.api.CommandExecutor;
 +import org.apache.airavata.gsi.ssh.api.SSHApiException;
 +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 +import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
 +import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
 +import org.apache.airavata.model.workspace.experiment.JobState;
 +import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 +import org.apache.airavata.schemas.gfac.NameValuePairType;
 +import org.apache.airavata.schemas.gfac.SSHHostType;
 +import org.apache.airavata.schemas.gfac.URIArrayType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 +
 +/**
 + * Execute application using remote SSH
 + */
- public class SSHProvider extends AbstractProvider implements GFacProvider{
++public class SSHProvider extends AbstractProvider{
 +    private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
 +    private Cluster cluster;
 +    private String jobID = null;
 +    private String taskID = null;
 +    // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
 +    private GSISSHProvider gsiSshProvider = null;
 +
 +    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
 +    	super.initialize(jobExecutionContext);
 +    	taskID = jobExecutionContext.getTaskData().getTaskID();
 +		if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
 +            jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
 +            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
 +            
 +            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
 +            String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
 +            details.setJobID(taskID);
 +            details.setJobDescription(remoteFile);
 +            jobExecutionContext.setJobDetails(details);
 +            JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
 +            details.setJobDescription(jobDescriptor.toXML());
 +         
 +            GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
 +            log.info(remoteFile);
 +            try {
 +            	File runscript = createShellScript(jobExecutionContext);
 +                cluster.scpTo(remoteFile, runscript.getAbsolutePath());
 +            } catch (Exception e) {
 +                throw new GFacProviderException(e.getLocalizedMessage(), e);
 +            }
 +        }else{
 +           gsiSshProvider = new GSISSHProvider();
 +        }
 +    }
 +
 +
 +    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
 +        if (gsiSshProvider == null) {
 +            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
 +            try {
 +                /*
 +                 * Execute
 +                 */
 +                String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
 +                details.setJobDescription(execuable);
 +
 +//                GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
 +                RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
 +
 +                StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
 +                
 +                CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
 +                String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
 +
 +                log.info("stdout=" + stdOutputString);
 +             
 +//                GFacUtils.updateJobStatus(details, JobState.COMPLETE);
 +            } catch (Exception e) {
 +                throw new GFacProviderException(e.getMessage(), e);
 +            } finally {
 +                if (cluster != null) {
 +                	try {
 +						cluster.disconnect();
 +					} catch (SSHApiException e) {
 +						throw new GFacProviderException(e.getMessage(), e);
 +					}
 +                }
 +            }
 +        } else {
 +            try {
 +                gsiSshProvider.execute(jobExecutionContext);
 +            } catch (GFacException e) {
 +                throw new GFacProviderException(e.getMessage(), e);
 +            }
 +        }
 +    }
 +
 +    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
 +        if (gsiSshProvider != null){
 +            try {
 +                gsiSshProvider.dispose(jobExecutionContext);
 +            } catch (GFacException e) {
 +                throw new GFacProviderException(e.getMessage(),e);
 +            }
 +        }
 +    }
 +
 +
 +    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
 +        throw new NotImplementedException();
 +    }
 +
 +
 +    private File createShellScript(JobExecutionContext context) throws IOException {
 +        ApplicationDeploymentDescriptionType app = context.getApplicationContext()
 +                .getApplicationDeploymentDescription().getType();
 +        String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
 +                + new Random().nextLong();
 +
 +        File shellScript = File.createTempFile(uniqueDir, "sh");
 +        OutputStream out = new FileOutputStream(shellScript);
 +
 +        out.write("#!/bin/bash\n".getBytes());
 +        out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
 +        out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
 +        out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
 +                .getBytes());
 +        // get the env of the host and the application
 +        NameValuePairType[] env = app.getApplicationEnvironmentArray();
 +
 +        Map<String, String> nv = new HashMap<String, String>();
 +        if (env != null) {
 +            for (int i = 0; i < env.length; i++) {
 +                String key = env[i].getName();
 +                String value = env[i].getValue();
 +                nv.put(key, value);
 +            }
 +        }
 +        for (Entry<String, String> entry : nv.entrySet()) {
 +            log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
 +            out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
 +
 +        }
 +
 +        // prepare the command
 +        final String SPACE = " ";
 +        StringBuffer cmd = new StringBuffer();
 +        cmd.append(app.getExecutableLocation());
 +        cmd.append(SPACE);
 +
 +        MessageContext input = context.getInMessageContext();
 +        ;
 +        Map<String, Object> inputs = input.getParameters();
 +        Set<String> keys = inputs.keySet();
 +        for (String paramName : keys) {
 +            ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
 +            if ("URIArray".equals(actualParameter.getType().getType().toString())) {
 +                String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
 +                for (String value : values) {
 +                    cmd.append(value);
 +                    cmd.append(SPACE);
 +                }
 +            } else {
 +                String paramValue = MappingFactory.toString(actualParameter);
 +                cmd.append(paramValue);
 +                cmd.append(SPACE);
 +            }
 +        }
 +        // We redirect the error and stdout to remote files, they will be read
 +        // in later
 +        cmd.append(SPACE);
 +        cmd.append("1>");
 +        cmd.append(SPACE);
 +        cmd.append(app.getStandardOutput());
 +        cmd.append(SPACE);
 +        cmd.append("2>");
 +        cmd.append(SPACE);
 +        cmd.append(app.getStandardError());
 +
 +        String cmdStr = cmd.toString();
 +        log.info("Command = " + cmdStr);
 +        out.write((cmdStr + "\n").getBytes());
 +        String message = "\"execuationSuceeded\"";
 +        out.write(("echo " + message + "\n").getBytes());
 +        out.close();
 +
 +        return shellScript;
 +    }
 +
 +    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
 +         if (gsiSshProvider != null){
 +            try {
 +               initProperties(properties);
 +            } catch (GFacException e) {
 +                throw new GFacProviderException(e.getMessage(),e);
 +            }
 +        }
 +    }
 +    /**
 +     * This method will read standard output and if there's any it will be parsed
 +     * @param jobIDReaderCommandOutput
 +     * @param errorMsg
 +     * @return
 +     * @throws SSHApiException
 +     */
 +    private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
 +        String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
 +        String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
 +
 +        if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
 +            log.error("Standard Error output : " + stdErrorString);
 +            throw new SSHApiException(errorMsg + stdErrorString);
 +        }
 +        return stdOutputString;
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/f2b5df44/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------