You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/06/24 13:01:18 UTC
svn commit: r1139241 [2/3] - in /incubator/airavata/core/trunk/gfac: ./
.settings/ src/ src/main/ src/main/java/ src/main/java/org/
src/main/java/org/apache/ src/main/java/org/apache/airavata/
src/main/java/org/apache/airavata/core/ src/main/java/org/a...
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/EC2Provider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/EC2Provider.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/EC2Provider.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/EC2Provider.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,555 @@
+package org.apache.airavata.core.gfac.provider;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.namespace.QName;
+
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.common.Base64;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.channel.direct.Session.Command;
+import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.context.impl.AmazonSecurityContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.model.ExecutionModel;
+import org.apache.airavata.core.gfac.notification.NotificationService;
+import org.apache.airavata.core.gfac.utils.GFacConstants;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.apache.airavata.core.gfac.utils.OutputUtils;
+import org.bouncycastle.openssl.PEMWriter;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
+import com.amazonaws.services.ec2.model.DeleteKeyPairRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.DescribeKeyPairsRequest;
+import com.amazonaws.services.ec2.model.GroupIdentifier;
+import com.amazonaws.services.ec2.model.ImportKeyPairRequest;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.InstanceStateName;
+import com.amazonaws.services.ec2.model.IpPermission;
+import com.amazonaws.services.ec2.model.RunInstancesRequest;
+import com.amazonaws.services.ec2.model.RunInstancesResult;
+
+import edu.indiana.extreme.lead.workflow_tracking.common.DurationObj;
+
+
+
+public class EC2Provider extends AbstractProvider {
+
+ public static final String AMAZON_SECURITY_CONTEXT = "amazon";
+
+ public static final int SLEEP_TIME_SECOND = 120;
+
+ public static final String KEY_PAIR_NAME = "gfac";
+
+ public static final String KEY_PAIR_FILE = "ec2_rsa";
+
+ private static final String privateKeyFilePath = System.getProperty("user.home") + "/.ssh/" + KEY_PAIR_FILE;
+
+ private Instance instance;
+
+ private static final String SPACE = " ";
+
+ private String buildCommand(List<String> cmdList) {
+ StringBuffer buff = new StringBuffer();
+ for (String string : cmdList) {
+ buff.append(string);
+ buff.append(SPACE);
+ }
+ return buff.toString();
+ }
+
+ @Override
+ public void initialize(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext appExecContext = invocationContext.getExecutionContext();
+ ExecutionModel model = appExecContext.getExecutionModel();
+
+ AmazonSecurityContext amazonSecurityContext = ((AmazonSecurityContext) invocationContext.getSecurityContext(AMAZON_SECURITY_CONTEXT));
+ String access_key = amazonSecurityContext.getAccessKey();
+ String secret_key = amazonSecurityContext.getSecretKey();
+
+ //TODO way to read value (header or xregistry)
+ String ami_id = "";
+ String ins_type = "";
+ String ins_id = "";
+
+ /*
+ * Need to start EC2 instance before running it
+ */
+ AWSCredentials credential = new BasicAWSCredentials(access_key, secret_key);
+ AmazonEC2Client ec2client = new AmazonEC2Client(credential);
+
+ try {
+ /*
+ * Build key pair before start instance
+ */
+ buildKeyPair(ec2client);
+
+ // right now, we can run it on one host
+ if (ami_id != null)
+ this.instance = startInstances(ec2client, ami_id, ins_type, invocationContext.getExecutionContext().getNotificationService()).get(0);
+ else {
+
+ // already running instance
+ DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
+ DescribeInstancesResult describeInstancesResult = ec2client.describeInstances(describeInstancesRequest.withInstanceIds(ins_id));
+
+ if (describeInstancesResult.getReservations().size() == 0 || describeInstancesResult.getReservations().get(0).getInstances().size() == 0) {
+ throw new GfacException("Instance not found:" + ins_id, FaultCode.InvalidRequest);
+ }
+
+ this.instance = describeInstancesResult.getReservations().get(0).getInstances().get(0);
+
+ // check instance keypair
+ if (this.instance.getKeyName() == null || !this.instance.getKeyName().equals(KEY_PAIR_NAME))
+ throw new GfacException("Keypair for instance:" + ins_id + " is not valid", FaultCode.InvalidRequest);
+ }
+
+ //send out instance id
+ invocationContext.getExecutionContext().getNotificationService().sendResourceMappingNotifications(this.instance.getPublicDnsName(), "EC2 Instance " + this.instance.getInstanceId() + " is running with public name " + this.instance.getPublicDnsName(), this.instance.getInstanceId());
+
+
+ /*
+ * Make sure port 22 is connectable
+ */
+ for (GroupIdentifier g : this.instance.getSecurityGroups()) {
+ IpPermission ip = new IpPermission();
+ ip.setIpProtocol("tcp");
+ ip.setFromPort(22);
+ ip.setToPort(22);
+ AuthorizeSecurityGroupIngressRequest r = new AuthorizeSecurityGroupIngressRequest();
+ r = r.withIpPermissions(ip.withIpRanges("0.0.0.0/0"));
+ r.setGroupId(g.getGroupId());
+ try {
+ ec2client.authorizeSecurityGroupIngress(r);
+ } catch (AmazonServiceException as) {
+ /*
+ * If exception is from duplicate room, ignore it.
+ */
+ if (!as.getErrorCode().equals("InvalidPermission.Duplicate"))
+ throw as;
+ }
+ }
+
+ } catch (Exception e) {
+ // TODO throw out
+ e.printStackTrace();
+ log.error(e.getMessage(), e);
+ throw new GfacException(e, FaultCode.InvalidRequest);
+ }
+
+ //set Host location
+ model.setHost(this.instance.getPublicDnsName());
+
+ /*
+ * Make directory
+ */
+ SSHClient ssh = new SSHClient();
+ try {
+ ssh.loadKnownHosts();
+ ssh.connect(this.instance.getPublicDnsName());
+
+ ssh.authPublickey(privateKeyFilePath);
+ final Session session = ssh.startSession();
+ try {
+ StringBuilder command = new StringBuilder();
+ command.append("mkdir -p ");
+ command.append(model.getTmpDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getWorkingDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getInputDataDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getOutputDataDir());
+ Command cmd = session.exec(command.toString());
+ cmd.join(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ throw new GfacException(e.getMessage(), e);
+ } finally {
+ try {
+ ssh.disconnect();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @Override
+ public void execute(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext context = invocationContext.getExecutionContext();
+ ExecutionModel model = context.getExecutionModel();
+
+ List<String> cmdList = new ArrayList<String>();
+
+ SSHClient ssh = new SSHClient();
+ try {
+
+ /*
+ * Notifier
+ */
+ NotificationService notifier = context.getNotificationService();
+
+ /*
+ * Builder Command
+ */
+ cmdList.add(context.getExecutionModel().getExecutable());
+ cmdList.addAll(context.getExecutionModel().getInputParameters());
+
+ // create process builder from command
+ String command = buildCommand(cmdList);
+
+ //redirect StdOut and StdErr
+ command += SPACE + "1>" + SPACE + model.getStdOut();
+ command += SPACE + "2>" + SPACE + model.getStderr();
+
+ // get the env of the host and the application
+ Map<String, String> nv = context.getExecutionModel().getEnv();
+
+ // extra env's
+ nv.put(GFacConstants.INPUT_DATA_DIR, context.getExecutionModel().getInputDataDir());
+ nv.put(GFacConstants.OUTPUT_DATA_DIR, context.getExecutionModel().getOutputDataDir());
+
+ // log info
+ log.info("Command = " + buildCommand(cmdList));
+ for (String key : nv.keySet()) {
+ log.info("Env[" + key + "] = " + nv.get(key));
+ }
+
+ // notify start
+ DurationObj compObj = notifier.computationStarted();
+
+ /*
+ * Create ssh connection
+ */
+ ssh.loadKnownHosts();
+ ssh.connect(model.getHost());
+ ssh.authPublickey(privateKeyFilePath);
+
+ final Session session = ssh.startSession();
+ try {
+ /*
+ * Build working Directory
+ */
+ log.info("WorkingDir = " + model.getWorkingDir());
+ session.exec("mkdir -p " + model.getWorkingDir());
+ session.exec("cd " + model.getWorkingDir());
+
+ /*
+ * Set environment
+ */
+ for (String key : nv.keySet()) {
+ session.setEnvVar(key, nv.get(key));
+ }
+
+ /*
+ * Execute
+ */
+ Command cmd = session.exec(command);
+ log.info("stdout=" + GfacUtils.readFromStream(session.getInputStream()));
+ cmd.join(5, TimeUnit.SECONDS);
+
+
+ // notify end
+ notifier.computationFinished(compObj);
+
+ /*
+ * check return value. usually not very helpful to draw conclusions
+ * based on return values so don't bother. just provide warning in
+ * the log messages
+ */
+ if (cmd.getExitStatus() != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ File logDir = new File("./service_logs");
+ if (!logDir.exists()) {
+ logDir.mkdir();
+ }
+
+ // Get the Stdouts and StdErrs
+ QName x = QName.valueOf(invocationContext.getServiceName());
+ String timeStampedServiceName = GfacUtils.createServiceDirName(x);
+ File localStdOutFile = new File(logDir, timeStampedServiceName + ".stdout");
+ File localStdErrFile = new File(logDir, timeStampedServiceName + ".stderr");
+
+ SCPFileTransfer fileTransfer = ssh.newSCPFileTransfer();
+ fileTransfer.download(model.getStdOut(), localStdOutFile.getAbsolutePath());
+ fileTransfer.download(model.getStderr(), localStdErrFile.getAbsolutePath());
+
+ context.getExecutionModel().setStdoutStr(GfacUtils.readFile(localStdOutFile.getAbsolutePath()));
+ context.getExecutionModel().setStderrStr(GfacUtils.readFile(localStdErrFile.getAbsolutePath()));
+
+ // set to context
+ OutputUtils.fillOutputFromStdout(invocationContext.getMessageContext("output"), context.getExecutionModel().getStdoutStr(), context.getExecutionModel().getStderrStr());
+
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ throw new GfacException(e.getMessage(), e);
+ } finally {
+ try {
+ ssh.disconnect();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @Override
+ public void dispose(InvocationContext invocationContext) throws GfacException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void abort(InvocationContext invocationContext) throws GfacException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ private List<Instance> startInstances(AmazonEC2Client ec2, String AMI_ID, String INS_TYPE, NotificationService notifier) throws AmazonServiceException {
+ // start only 1 instance
+ RunInstancesRequest request = new RunInstancesRequest(AMI_ID, 1, 1);
+ request.setKeyName(KEY_PAIR_NAME);
+ request.setInstanceType(INS_TYPE);
+
+ RunInstancesResult result = ec2.runInstances(request);
+
+ List<Instance> instances = result.getReservation().getInstances();
+
+ while (!allInstancesStateEqual(instances, InstanceStateName.Running)) {
+
+ // instance status should not be Terminated
+ if (anyInstancesStateEqual(instances, InstanceStateName.Terminated)) {
+ throw new AmazonClientException("Some Instance is terminated before running a job");
+ }
+
+ // notify the status
+ for (Instance ins: instances) {
+ notifier.info("EC2 Instance " +ins.getInstanceId() + " is " + ins.getState().getName().toString());
+ }
+
+ try {
+ Thread.sleep(SLEEP_TIME_SECOND * 1000l);
+ } catch (Exception ex) {
+ // no op
+ }
+
+ DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
+ describeInstancesRequest.setInstanceIds(getInstanceIDs(instances));
+
+ DescribeInstancesResult describeInstancesResult = ec2.describeInstances(describeInstancesRequest);
+ instances = describeInstancesResult.getReservations().get(0).getInstances();
+ }
+
+ log.info("All instances is running");
+ return instances;
+ }
+
+ private void buildKeyPair(AmazonEC2Client ec2) throws NoSuchAlgorithmException, InvalidKeySpecException, AmazonServiceException, AmazonClientException, IOException {
+
+ boolean newKey = false;
+
+ File privateKeyFile = new File(privateKeyFilePath);
+ File publicKeyFile = new File(privateKeyFilePath + ".pub");
+
+ /*
+ * Check if Keypair already created on the server
+ */
+ if (!privateKeyFile.exists()) {
+
+ // check folder and create if it does not exist
+ File sshDir = new File(System.getProperty("user.home") + "/.ssh/");
+ if (!sshDir.exists())
+ sshDir.mkdir();
+
+ // Generate a 1024-bit RSA key pair
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
+ keyGen.initialize(1024);
+ KeyPair keypair = keyGen.genKeyPair();
+
+ FileOutputStream fos = null;
+
+ // Store Public Key.
+ try {
+ fos = new FileOutputStream(privateKeyFilePath + ".pub");
+ fos.write(Base64.encodeBytes(keypair.getPublic().getEncoded()).getBytes());
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ if (fos != null) {
+ try {
+ fos.close();
+ fos = null;
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ // Store Private Key.
+ try {
+ fos = new FileOutputStream(privateKeyFilePath);
+ StringWriter stringWriter = new StringWriter();
+
+ /*
+ * Write in PEM format (openssl support)
+ */
+ PEMWriter pemFormatWriter = new PEMWriter(stringWriter);
+ pemFormatWriter.writeObject(keypair.getPrivate());
+ pemFormatWriter.close();
+ fos.write(stringWriter.toString().getBytes());
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ if (fos != null) {
+ try {
+ fos.close();
+ fos = null;
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ privateKeyFile.setWritable(false, false);
+ privateKeyFile.setExecutable(false, false);
+ privateKeyFile.setReadable(false, false);
+ privateKeyFile.setReadable(true);
+ privateKeyFile.setWritable(true);
+
+ // set that this key is just created
+ newKey = true;
+ }
+
+ /*
+ * Read Public Key
+ */
+ String encodedPublicKey = null;
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(publicKeyFile));
+ encodedPublicKey = br.readLine();
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ br = null;
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ /*
+ * Generate key pair in Amazon if necessary
+ */
+ try {
+ /*
+ * Get current key pair in Amazon
+ */
+ DescribeKeyPairsRequest describeKeyPairsRequest = new DescribeKeyPairsRequest();
+ ec2.describeKeyPairs(describeKeyPairsRequest.withKeyNames(KEY_PAIR_NAME));
+
+ /*
+ * If key exists and new key is created, delete old key and replace
+ * with new one. Else, do nothing
+ */
+
+ if (newKey) {
+ DeleteKeyPairRequest deleteKeyPairRequest = new DeleteKeyPairRequest(KEY_PAIR_NAME);
+ ec2.deleteKeyPair(deleteKeyPairRequest);
+ ImportKeyPairRequest importKeyPairRequest = new ImportKeyPairRequest(KEY_PAIR_NAME, encodedPublicKey);
+ ec2.importKeyPair(importKeyPairRequest);
+ }
+
+ } catch (AmazonServiceException ase) {
+ /*
+ * Key doesn't exists, import new key.
+ */
+ if(ase.getErrorCode().equals("InvalidKeyPair.NotFound")){
+ ImportKeyPairRequest importKeyPairRequest = new ImportKeyPairRequest(KEY_PAIR_NAME, encodedPublicKey);
+ ec2.importKeyPair(importKeyPairRequest);
+ }else{
+ throw ase;
+ }
+ }
+ }
+
+ private boolean anyInstancesStateEqual(List<Instance> instances, InstanceStateName name) {
+ for (Iterator<Instance> iterator = instances.iterator(); iterator.hasNext();) {
+ Instance instance = (Instance) iterator.next();
+
+ // if one of instance is not running, return false
+ if (InstanceStateName.fromValue(instance.getState().getName()) == name) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean allInstancesStateEqual(List<Instance> instances, InstanceStateName name) {
+ for (Iterator<Instance> iterator = instances.iterator(); iterator.hasNext();) {
+ Instance instance = (Instance) iterator.next();
+
+ // if one of instance is not running, return false
+ if (InstanceStateName.fromValue(instance.getState().getName()) != name) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private List<String> getInstanceIDs(List<Instance> instances) {
+ List<String> ret = new ArrayList<String>();
+ for (Iterator<Instance> iterator = instances.iterator(); iterator.hasNext();) {
+ Instance instance = (Instance) iterator.next();
+ ret.add(instance.getInstanceId());
+ }
+ return ret;
+ }
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,275 @@
+package org.apache.airavata.core.gfac.provider;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.context.impl.GSISecurityContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.JobSubmissionFault;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.external.GridFtp;
+import org.apache.airavata.core.gfac.model.ExecutionModel;
+import org.apache.airavata.core.gfac.notification.NotificationService;
+import org.apache.airavata.core.gfac.provider.utils.GramRSLGenerator;
+import org.apache.airavata.core.gfac.provider.utils.JobSubmissionListener;
+import org.apache.airavata.core.gfac.utils.ErrorCodes;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.apache.airavata.core.gfac.utils.OutputUtils;
+import org.apache.airavata.core.gfac.utils.GFacOptions.CurrentProviders;
+import org.globus.gram.GramAttributes;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ogce.schemas.gfac.documents.GlobusGatekeeperType;
+
+import edu.indiana.extreme.lead.workflow_tracking.common.DurationObj;
+
+public class GramProvider extends AbstractProvider {
+
+ public static final String MYPROXY_SECURITY_CONTEXT = "myproxy";
+
+ public void initialize(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext appExecContext = invocationContext.getExecutionContext();
+ ExecutionModel model = appExecContext.getExecutionModel();
+
+ GridFtp ftp = new GridFtp();
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext) invocationContext.getSecurityContext(MYPROXY_SECURITY_CONTEXT)).getGssCredentails();
+
+ // get Hostname
+ String hostgridFTP = null;
+
+ if (model.getHostDesc().getHostConfiguration().getGridFTPArray() != null && model.getHostDesc().getHostConfiguration().getGridFTPArray().length > 0) {
+ hostgridFTP = model.getHostDesc().getHostConfiguration().getGridFTPArray(0).getEndPointReference();
+ } else {
+ hostgridFTP = model.getHost();
+ }
+
+ URI tmpdirURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getTmpDir());
+ URI workingDirURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getWorkingDir());
+ URI inputURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getInputDataDir());
+ URI outputURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getOutputDataDir());
+
+ log.info("Host FTP = " + hostgridFTP);
+ log.info("temp directory = " + tmpdirURI);
+ log.info("Working directory = " + workingDirURI);
+ log.info("Input directory = " + inputURI);
+ log.info("Output directory = " + outputURI);
+
+ ftp.makeDir(tmpdirURI, gssCred);
+ ftp.makeDir(workingDirURI, gssCred);
+ ftp.makeDir(inputURI, gssCred);
+ ftp.makeDir(outputURI, gssCred);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void execute(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext context = invocationContext.getExecutionContext();
+
+ String contact = null;
+ log.info("Searching for Gate Keeper");
+ GlobusGatekeeperType gatekeeper = context.getExecutionModel().getGatekeeper();
+ if (gatekeeper == null) {
+ contact = context.getExecutionModel().getHost();
+ } else {
+ contact = gatekeeper.getEndPointReference();
+ }
+ log.info("Using Globus GateKeeper " + contact);
+ GramJob job = null;
+ boolean jobSucsseful = false;
+
+ String rsl = "";
+ int errCode = 0;
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext) context.getSecurityContext()).getGssCredentails();
+
+ log.info("Host desc = " + context.getExecutionModel().getHostDesc().xmlText());
+
+ GramAttributes jobAttr = GramRSLGenerator.configureRemoteJob(context);
+ rsl = jobAttr.toRSL();
+ job = new GramJob(rsl);
+ job.setCredentials(gssCred);
+
+ log.info("RSL = " + rsl);
+
+ NotificationService notifier = context.getNotificationService();
+ DurationObj compObj = notifier.computationStarted();
+ StringBuffer buf = new StringBuffer();
+
+ JobSubmissionListener listener = new JobSubmissionListener(job, context);
+ job.addListener(listener);
+ log.info("Request to contact:" + contact);
+ // The first boolean is to specify the job is a batch job - use true
+ // for interactive and false for batch.
+ // the second boolean is to specify to use the full proxy and not
+ // delegate a limited proxy.
+ job.request(contact, false, false);
+
+ log.info("JobID = " + job.getIDAsString());
+
+ // Gram.request(contact, job, false, false);
+
+ buf.append("Finished launching job, Host = ").append(context.getExecutionModel().getHost()).append(" RSL = ").append(job.getRSL()).append("working directory =").append(context.getExecutionModel().getWorkingDir()).append("tempDirectory =").append(context.getExecutionModel().getTmpDir())
+ .append("Globus GateKeeper cantact = ").append(contact);
+ context.getNotificationService().info(buf.toString());
+ String gramJobid = job.getIDAsString();
+ context.getNotificationService().info("JobID=" + gramJobid);
+ log.info(buf.toString());
+ // Send Audit Notifications
+ notifier.appAudit(invocationContext.getServiceName(), new URI(job.getIDAsString()), contact, null, null, gssCred.getName().toString(), null, job.getRSL());
+
+ listener.waitFor();
+ job.removeListener(listener);
+
+ int jobStatus = listener.getStatus();
+ if (jobStatus == GramJob.STATUS_FAILED) {
+ errCode = listener.getError();
+ // Adding retry for error code to properties files as
+ // gfac.retryonJobErrorCodes with comma separated
+ if (context.getServiceContext().getGlobalConfiguration().getRetryonErrorCodes().contains(Integer.toString(errCode))) {
+ try {
+ log.info("Job Failed with Error code " + errCode + " and job id: " + gramJobid);
+ log.info("Retry job sumttion one more time for error code" + errCode);
+ job = new GramJob(rsl);
+ job.setCredentials(gssCred);
+ listener = new JobSubmissionListener(job, context);
+ job.addListener(listener);
+ job.request(contact, false, false);
+ String newGramJobid = job.getIDAsString();
+ String jobStatusMessage = GfacUtils.formatJobStatus(newGramJobid, "RETRY");
+ context.getNotificationService().info(jobStatusMessage);
+ context.getNotificationService().info("JobID=" + newGramJobid);
+ notifier.appAudit(context.getServiceContext().getService().getService().getServiceName().getStringValue(), new URI(job.getIDAsString()), contact, null, null, gssCred.getName().toString(), null, job.getRSL());
+ listener.waitFor();
+ job.removeListener(listener);
+ int jobStatus1 = listener.getStatus();
+ if (jobStatus1 == GramJob.STATUS_FAILED) {
+ int errCode1 = listener.getError();
+ String errorMsg = "Job " + job.getID() + " on host " + context.getExecutionModel().getHost() + " Error Code = " + errCode1;
+ String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
+ throw new JobSubmissionFault(new Exception(errorMsg), localHost, "", "", CurrentProviders.Gram);
+ }
+ } catch (Exception e) {
+ String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
+ throw new JobSubmissionFault(e, localHost, "", "", CurrentProviders.Gram);
+ }
+ } else {
+ String errorMsg = "Job " + job.getID() + " on host " + context.getExecutionModel().getHost() + " Error Code = " + errCode;
+ String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
+ GfacException error = new JobSubmissionFault(new Exception(errorMsg), localHost, contact, rsl, CurrentProviders.Gram);
+ if (errCode == 8) {
+ error.setFaultCode(ErrorCodes.JOB_CANCELED);
+ } else {
+ error.setFaultCode(ErrorCodes.JOB_FAILED);
+ }
+ // error.addProperty(ErrorCodes.JOB_TYPE,
+ // ErrorCodes.JobType.Gram.toString());
+ // error.addProperty(ErrorCodes.CONTACT, contact);
+ throw error;
+ }
+ }
+ notifier.computationFinished(compObj);
+
+ /*
+ * Stdout and Stderror
+ */
+ GridFtp ftp = new GridFtp();
+
+ // get Hostname
+ String hostgridFTP = null;
+
+ if (invocationContext.getExecutionContext().getExecutionModel().getHostDesc().getHostConfiguration().getGridFTPArray() != null && invocationContext.getExecutionContext().getExecutionModel().getHostDesc().getHostConfiguration().getGridFTPArray().length > 0) {
+ hostgridFTP = invocationContext.getExecutionContext().getExecutionModel().getHostDesc().getHostConfiguration().getGridFTPArray(0).getEndPointReference();
+ } else {
+ hostgridFTP = invocationContext.getExecutionContext().getExecutionModel().getHost();
+ }
+
+ URI stdoutURI = GfacUtils.createGsiftpURI(hostgridFTP, invocationContext.getExecutionContext().getExecutionModel().getStdOut());
+ URI stderrURI = GfacUtils.createGsiftpURI(hostgridFTP, invocationContext.getExecutionContext().getExecutionModel().getStderr());
+
+ System.out.println(stdoutURI);
+ System.out.println(stderrURI);
+
+ File logDir = new File("./service_logs");
+ if (!logDir.exists()) {
+ logDir.mkdir();
+ }
+
+ // Get the Stdouts and StdErrs
+ QName x = QName.valueOf(invocationContext.getServiceName());
+ String timeStampedServiceName = GfacUtils.createServiceDirName(x);
+ File localStdOutFile = new File(logDir, timeStampedServiceName + ".stdout");
+ File localStdErrFile = new File(logDir, timeStampedServiceName + ".stderr");
+
+ String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
+ String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
+
+ //set to context
+ OutputUtils.fillOutputFromStdout(invocationContext.getMessageContext("output"), stdout, stderr);
+
+
+ jobSucsseful = true;
+ } catch (GramException e) {
+ String localHost = "xxxx";
+ GfacException error = new JobSubmissionFault(e, localHost, contact, rsl, CurrentProviders.Gram);
+ if (errCode == 8) {
+ error.setFaultCode(ErrorCodes.JOB_CANCELED);
+ } else {
+ error.setFaultCode(ErrorCodes.JOB_FAILED);
+ }
+ // error.addProperty(ErrorCodes.JOB_TYPE,
+ // ErrorCodes.JobType.Gram.toString());
+ // error.addProperty(ErrorCodes.CONTACT, contact);
+ throw error;
+ } catch (GSSException e) {
+ String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
+ throw new JobSubmissionFault(e, localHost, contact, rsl, CurrentProviders.Gram);
+ } catch (URISyntaxException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ } catch (InterruptedException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ } finally {
+ if (job != null && !jobSucsseful) {
+ try {
+ job.cancel();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ }
+
+ public void dispose(InvocationContext invocationContext) throws GfacException {
+
+ }
+
+ public void abort(InvocationContext invocationContext) throws GfacException {
+ try {
+ ExecutionContext context = invocationContext.getExecutionContext();
+ GramJob job = new GramJob("");
+ job.setID(context.getExecutionModel().getJobID());
+ job.setCredentials(((GSISecurityContext) context.getSecurityContext()).getGssCredentails());
+ job.cancel();
+ } catch (MalformedURLException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ } catch (GramException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ } catch (GSSException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+
+ }
+
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/LocalProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/LocalProvider.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/LocalProvider.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/LocalProvider.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,230 @@
+package org.apache.airavata.core.gfac.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.JobSubmissionFault;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.notification.NotificationService;
+import org.apache.airavata.core.gfac.utils.GFacConstants;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.apache.airavata.core.gfac.utils.OutputUtils;
+import org.apache.airavata.core.gfac.utils.GFacOptions.CurrentProviders;
+
+import edu.indiana.extreme.lead.workflow_tracking.common.DurationObj;
+
+public class LocalProvider extends AbstractProvider {
+
+ private static final String SPACE = " ";
+
+
+ private String buildCommand(List<String> cmdList){
+ StringBuffer buff = new StringBuffer();
+ for (String string : cmdList) {
+ buff.append(string);
+ buff.append(SPACE);
+ }
+ return buff.toString();
+ }
+
+ public void initialize(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext context = invocationContext.getExecutionContext();
+
+ log.info("working diectroy = " + context.getExecutionModel().getWorkingDir());
+ log.info("temp directory = " + context.getExecutionModel().getTmpDir());
+ new File(context.getExecutionModel().getWorkingDir()).mkdir();
+ new File(context.getExecutionModel().getTmpDir()).mkdir();
+ new File(context.getExecutionModel().getInputDataDir()).mkdir();
+ new File(context.getExecutionModel().getOutputDataDir()).mkdir();
+ }
+
+ public void execute(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext context = invocationContext.getExecutionContext();
+
+ List<String> cmdList = new ArrayList<String>();
+
+ try {
+ /*
+ * Notifier
+ */
+ NotificationService notifier = context.getNotificationService();
+
+ /*
+ * Builder Command
+ */
+ cmdList.add(context.getExecutionModel().getExecutable());
+ cmdList.addAll(context.getExecutionModel().getInputParameters());
+
+ //create process builder from command
+ ProcessBuilder builder = new ProcessBuilder(cmdList);
+
+
+ // get the env of the host and the application
+ Map<String, String> nv = context.getExecutionModel().getEnv();
+ builder.environment().putAll(nv);
+
+ // extra env's
+ builder.environment().put(GFacConstants.INPUT_DATA_DIR, context.getExecutionModel().getInputDataDir());
+ builder.environment().put(GFacConstants.OUTPUT_DATA_DIR, context.getExecutionModel().getOutputDataDir());
+
+ //working directory
+ builder.directory(new File(context.getExecutionModel().getWorkingDir()));
+
+
+ //log info
+ log.info("Command = " + buildCommand(cmdList));
+ log.info("Working dir = " + builder.directory());
+ for (String key : builder.environment().keySet()) {
+ log.info("Env[" + key + "] = " + builder.environment().get(key));
+ }
+
+ //notify start
+ DurationObj compObj = notifier.computationStarted();
+
+ //running cmd
+ Process process = builder.start();
+
+ final BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ final BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+ final BufferedWriter stdoutWtiter = new BufferedWriter(new FileWriter(context.getExecutionModel().getStdOut()));
+ final BufferedWriter stdErrWtiter = new BufferedWriter(new FileWriter(context.getExecutionModel().getStderr()));
+
+ Thread t1 = new Thread(new Runnable() {
+
+ public void run() {
+ try {
+ String line=null;
+ while ( (line = in.readLine()) != null){
+ log.debug(line);
+ stdoutWtiter.write(line);
+ stdoutWtiter.newLine();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (in != null){
+ try {
+ in.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (stdoutWtiter != null) {
+ try {
+ stdoutWtiter.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+
+ public void run() {
+ try {
+ String line=null;
+ while ((line = err.readLine()) != null){
+ log.debug(line);
+ stdErrWtiter.write(line);
+ stdErrWtiter.newLine();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (err != null){
+ try {
+ err.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ if (stdErrWtiter != null) {
+ try {
+ stdErrWtiter.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ });
+
+ //start output threads
+ t1.setDaemon(true);
+ t2.setDaemon(true);
+ t1.start();
+ t2.start();
+
+
+ // wait for the process (application) to finish executing
+ int returnValue = process.waitFor();
+
+ // notify end
+ notifier.computationFinished(compObj);
+
+ // make sure other two threads are done
+ t1.join();
+ t2.join();
+
+ /*
+ * check return value. usually not very helpful to draw conclusions
+ * based on return values so don't bother. just provide warning in
+ * the log messages
+ */
+ if (returnValue != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ StringBuffer buf = new StringBuffer();
+ buf.append("Executed ")
+ .append(buildCommand(cmdList))
+ .append(" on the localHost, working directory =")
+ .append(context.getExecutionModel().getWorkingDir())
+ .append("tempDirectory =")
+ .append(context.getExecutionModel().getTmpDir())
+ .append("With the status")
+ .append(String.valueOf(returnValue));
+ context.getNotificationService().info(buf.toString());
+
+ log.info(buf.toString());
+
+ context.getExecutionModel().setStdoutStr(GfacUtils.readFile(context.getExecutionModel().getStdOut()));
+ context.getExecutionModel().setStderrStr(GfacUtils.readFile(context.getExecutionModel().getStderr()));
+
+ //set to context
+ OutputUtils.fillOutputFromStdout(invocationContext.getMessageContext("output"), context.getExecutionModel().getStdoutStr(), context.getExecutionModel().getStderrStr());
+
+ } catch (IOException e) {
+ throw new JobSubmissionFault(e, "", "", buildCommand(cmdList), CurrentProviders.Local);
+ } catch (InterruptedException e) {
+ throw new GfacException(e, FaultCode.LocalError);
+ }
+
+ }
+
+ public void dispose(InvocationContext invocationContext) throws GfacException {
+
+ }
+
+ public void abort(InvocationContext invocationContext) throws GfacException {
+
+ }
+
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/Provider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/Provider.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/Provider.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/Provider.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,15 @@
+package org.apache.airavata.core.gfac.provider;
+
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+
+public interface Provider {
+
+ void initialize(InvocationContext invocationContext) throws GfacException;
+
+ void execute(InvocationContext invocationContext) throws GfacException;
+
+ void dispose(InvocationContext invocationContext) throws GfacException;
+
+ void abort(InvocationContext invocationContext) throws GfacException;
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/SSHProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/SSHProvider.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/SSHProvider.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/SSHProvider.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,228 @@
+package org.apache.airavata.core.gfac.provider;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.namespace.QName;
+
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.channel.direct.Session.Command;
+import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.model.ExecutionModel;
+import org.apache.airavata.core.gfac.notification.NotificationService;
+import org.apache.airavata.core.gfac.utils.GFacConstants;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.apache.airavata.core.gfac.utils.OutputUtils;
+
+import edu.indiana.extreme.lead.workflow_tracking.common.DurationObj;
+
+public class SSHProvider extends AbstractProvider {
+
+ private static final String SPACE = " ";
+
+ private String buildCommand(List<String> cmdList) {
+ StringBuffer buff = new StringBuffer();
+ for (String string : cmdList) {
+ buff.append(string);
+ buff.append(SPACE);
+ }
+ return buff.toString();
+ }
+
+ @Override
+ public void initialize(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext appExecContext = invocationContext.getExecutionContext();
+ ExecutionModel model = appExecContext.getExecutionModel();
+
+ SSHClient ssh = new SSHClient();
+ try {
+ ssh.loadKnownHosts();
+ ssh.connect(model.getHost());
+
+ // TODO how to authenticate with system
+ ssh.authPublickey(System.getProperty("user.name"));
+ final Session session = ssh.startSession();
+ try {
+ StringBuilder command = new StringBuilder();
+ command.append("mkdir -p ");
+ command.append(model.getTmpDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getWorkingDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getInputDataDir());
+ command.append(" | ");
+ command.append("mkdir -p ");
+ command.append(model.getOutputDataDir());
+ Command cmd = session.exec(command.toString());
+ cmd.join(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ throw new GfacException(e.getMessage(), e);
+ } finally {
+ try {
+ ssh.disconnect();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @Override
+ public void execute(InvocationContext invocationContext) throws GfacException {
+ ExecutionContext context = invocationContext.getExecutionContext();
+ ExecutionModel model = context.getExecutionModel();
+
+ List<String> cmdList = new ArrayList<String>();
+
+ SSHClient ssh = new SSHClient();
+ try {
+
+ /*
+ * Notifier
+ */
+ NotificationService notifier = context.getNotificationService();
+
+ /*
+ * Builder Command
+ */
+ cmdList.add(context.getExecutionModel().getExecutable());
+ cmdList.addAll(context.getExecutionModel().getInputParameters());
+
+ // create process builder from command
+ String command = buildCommand(cmdList);
+
+ //redirect StdOut and StdErr
+ command += SPACE + "1>" + SPACE + model.getStdOut();
+ command += SPACE + "2>" + SPACE + model.getStderr();
+
+ // get the env of the host and the application
+ Map<String, String> nv = context.getExecutionModel().getEnv();
+
+ // extra env's
+ nv.put(GFacConstants.INPUT_DATA_DIR, context.getExecutionModel().getInputDataDir());
+ nv.put(GFacConstants.OUTPUT_DATA_DIR, context.getExecutionModel().getOutputDataDir());
+
+ // log info
+ log.info("Command = " + buildCommand(cmdList));
+ for (String key : nv.keySet()) {
+ log.info("Env[" + key + "] = " + nv.get(key));
+ }
+
+ // notify start
+ DurationObj compObj = notifier.computationStarted();
+
+ /*
+ * Create ssh connection
+ */
+ ssh.loadKnownHosts();
+ ssh.connect(model.getHost());
+
+ // TODO how to authenticate with system
+ ssh.authPublickey(System.getProperty("user.name"));
+
+ final Session session = ssh.startSession();
+ try {
+ /*
+ * Build working Directory
+ */
+ log.info("WorkingDir = " + model.getWorkingDir());
+ session.exec("mkdir -p " + model.getWorkingDir());
+ session.exec("cd " + model.getWorkingDir());
+
+ /*
+ * Set environment
+ */
+ for (String key : nv.keySet()) {
+ session.setEnvVar(key, nv.get(key));
+ }
+
+ /*
+ * Execute
+ */
+ Command cmd = session.exec(command);
+ log.info("stdout=" + GfacUtils.readFromStream(session.getInputStream()));
+ cmd.join(5, TimeUnit.SECONDS);
+
+
+ // notify end
+ notifier.computationFinished(compObj);
+
+ /*
+ * check return value. usually not very helpful to draw conclusions
+ * based on return values so don't bother. just provide warning in
+ * the log messages
+ */
+ if (cmd.getExitStatus() != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ File logDir = new File("./service_logs");
+ if (!logDir.exists()) {
+ logDir.mkdir();
+ }
+
+ // Get the Stdouts and StdErrs
+ QName x = QName.valueOf(invocationContext.getServiceName());
+ String timeStampedServiceName = GfacUtils.createServiceDirName(x);
+ File localStdOutFile = new File(logDir, timeStampedServiceName + ".stdout");
+ File localStdErrFile = new File(logDir, timeStampedServiceName + ".stderr");
+
+ SCPFileTransfer fileTransfer = ssh.newSCPFileTransfer();
+ fileTransfer.download(model.getStdOut(), localStdOutFile.getAbsolutePath());
+ fileTransfer.download(model.getStderr(), localStdErrFile.getAbsolutePath());
+
+ context.getExecutionModel().setStdoutStr(GfacUtils.readFile(localStdOutFile.getAbsolutePath()));
+ context.getExecutionModel().setStderrStr(GfacUtils.readFile(localStdErrFile.getAbsolutePath()));
+
+ // set to context
+ OutputUtils.fillOutputFromStdout(invocationContext.getMessageContext("output"), context.getExecutionModel().getStdoutStr(), context.getExecutionModel().getStderrStr());
+
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ throw new GfacException(e.getMessage(), e);
+ } finally {
+ try {
+ ssh.disconnect();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ @Override
+ public void dispose(InvocationContext invocationContext) throws GfacException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void abort(InvocationContext invocationContext) throws GfacException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,145 @@
+package org.apache.airavata.core.gfac.provider.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.utils.GFacConstants;
+import org.globus.gram.GramAttributes;
+import org.ogce.namespaces.x2010.x08.x30.workflowContextHeader.WorkflowContextHeaderDocument.WorkflowContextHeader;
+import org.ogce.namespaces.x2010.x08.x30.workflowResourceMapping.ResourceMappingDocument.ResourceMapping;
+import org.ogce.schemas.gfac.documents.ApplicationDescriptionType;
+import org.ogce.schemas.gfac.documents.RSLParmType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class GramRSLGenerator {
+ protected final static Logger log = LoggerFactory.getLogger(GramRSLGenerator.class);
+ private enum JobType{SINGLE,MPI,MULTIPLE,CONDOR};
+
+ public static GramAttributes configureRemoteJob(ExecutionContext appExecContext) throws GfacException {
+ GramAttributes jobAttr = new GramAttributes();
+ jobAttr.setExecutable(appExecContext.getExecutionModel().getExecutable());
+ jobAttr.setDirectory(appExecContext.getExecutionModel().getWorkingDir());
+ jobAttr.setStdout(appExecContext.getExecutionModel().getStdOut());
+ jobAttr.setStderr(appExecContext.getExecutionModel().getStderr());
+
+ // The env here contains the env of the host and the application. i.e
+ // the env specified in
+ // the host description and application description documents
+ Map<String, String> nv = appExecContext.getExecutionModel().getEnv();
+
+ for (String key : nv.keySet()) {
+ jobAttr.addEnvVariable(key, nv.get(key));
+ }
+
+ jobAttr.addEnvVariable(GFacConstants.INPUT_DATA_DIR, appExecContext.getExecutionModel().getInputDataDir());
+ String outputDataDir = GFacConstants.OUTPUT_DATA_DIR;
+ if(!outputDataDir.isEmpty()){
+ jobAttr.addEnvVariable(outputDataDir, appExecContext.getExecutionModel().getOutputDataDir());
+ }
+ ApplicationDescriptionType app = appExecContext.getExecutionModel().getAplicationDesc();
+ WorkflowContextHeader contextHeader = appExecContext.getWorkflowHeader();
+ ResourceMapping resourceMapping = null;
+ if (contextHeader != null) {
+ resourceMapping = contextHeader.getResourceMappings().getResourceMappingArray(0);
+ }
+
+ log.info("Configure using App Desc = " + app.xmlText());
+ if (resourceMapping != null && resourceMapping.getMaxWallTime() > 0) {
+ log.info("Header Setting Max Wall Time" + resourceMapping.getMaxWallTime());
+ jobAttr.setMaxWallTime(resourceMapping.getMaxWallTime());
+
+ } else if (app.getMaxWallTime() > 0) {
+ log.info("Setting max wall clock time to " + app.getMaxWallTime());
+
+ if (app.getMaxWallTime() > 30 && app.getQueue() != null && app.getQueue().equals("debug")) {
+ throw new GfacException("NCSA debug Queue only support jobs < 30 minutes", FaultCode.InvalidConfig);
+ }
+
+ jobAttr.setMaxWallTime(app.getMaxWallTime());
+ jobAttr.set("proxy_timeout", "1");
+ } else {
+ jobAttr.setMaxWallTime(29);
+ }
+
+ if (appExecContext.getExecutionModel().getStdIn() != null) {
+ jobAttr.setStdin(appExecContext.getExecutionModel().getStdIn());
+ } else {
+ Iterator<String> values = appExecContext.getExecutionModel().getInputParameters().iterator();
+ while (values.hasNext()) {
+ jobAttr.addArgument(values.next());
+ }
+ }
+
+ if (resourceMapping != null && resourceMapping.getNodeCount() > 0) {
+ log.info("Setting number of procs to " + resourceMapping.getNodeCount());
+ jobAttr.set("hostCount", String.valueOf(resourceMapping.getNodeCount()));
+ }else if (app.getHostCount() > 1) {
+ jobAttr.set("hostCount", String.valueOf(app.getHostCount()));
+ }
+ if (resourceMapping != null && resourceMapping.getCpuCount() > 0) {
+ log.info("Setting host count to " + resourceMapping.getCpuCount());
+ jobAttr.setNumProcs(resourceMapping.getCpuCount());
+
+ } else if (app.getCount() > 1) {
+ log.info("Setting number of procs to " + app.getCount());
+ jobAttr.setNumProcs(app.getCount());
+ }
+
+ if (app.getProject() != null && app.getProject().getProjectName() != null) {
+ log.info("Setting project to " + app.getProject());
+ jobAttr.setProject(app.getProject().getProjectName());
+ }
+
+ if (resourceMapping != null && resourceMapping.getQueueName() != null) {
+ jobAttr.setQueue(resourceMapping.getQueueName());
+ } else if (app.getQueue() != null && app.getQueue().getQueueName() != null) {
+ log.info("Setting job queue to " + app.getQueue());
+ jobAttr.setQueue(app.getQueue().getQueueName());
+ }
+ String jobType = JobType.SINGLE.toString() ;
+
+ if (app.getJobType() != null) {
+ jobType = app.getJobType().toString();
+ }
+ if (jobType.equalsIgnoreCase(JobType.SINGLE.toString())) {
+ log.info("Setting job type to single");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
+ } else if (jobType.equalsIgnoreCase(JobType.MPI.toString())) {
+ log.info("Setting job type to mpi");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MPI);
+ } else if (jobType.equalsIgnoreCase(JobType.MULTIPLE.toString())) {
+ log.info("Setting job type to multiple");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MULTIPLE);
+ } else if (jobType.equalsIgnoreCase(JobType.CONDOR.toString())) {
+ jobAttr.setJobType(GramAttributes.JOBTYPE_CONDOR);
+ }
+
+ // Support to add the Additional RSL parameters
+ RSLParmType[] rslParams = app.getRslparmArray();
+ if (rslParams.length > 0) {
+ for (RSLParmType rslType : rslParams) {
+ log.info("Adding rsl param of [" + rslType.getName() + "," + rslType.getStringValue() + "]");
+ if (rslType.getName() != "") {
+ jobAttr.set(rslType.getName(), rslType.getStringValue());
+ }
+ }
+ }
+
+ // support urgency/SPRUCE case
+ // only add spruce rsl parameter if this host has a spruce jobmanager
+ // configured
+ if (appExecContext.getWorkflowHeader() != null && appExecContext.getWorkflowHeader().getURGENCY() != null
+ //&& GfacUtils.getSpruceGatekeeper(appExecContext) != null
+ ) {
+ jobAttr.set("urgency", appExecContext.getWorkflowHeader().getURGENCY());
+ }
+
+ return jobAttr;
+ }
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,93 @@
+package org.apache.airavata.core.gfac.provider.utils;
+
+import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.impl.GSISecurityContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.globus.gram.GramJobListener;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSubmissionListener implements GramJobListener {
+
+ private boolean finished;
+ private int error;
+ private int status;
+ private ExecutionContext executionContext;
+ private GramJob job;
+ protected final Logger log = LoggerFactory.getLogger(JobSubmissionListener.class);
+
+ public JobSubmissionListener(GramJob job, ExecutionContext executionContext) {
+ this.job = job;
+ this.executionContext = executionContext;
+ }
+ // waits for DONE or FAILED status
+ public void waitFor() throws InterruptedException,GSSException, GfacException, GramException {
+ while (!finished) {
+ int proxyExpTime = job.getCredentials().getRemainingLifetime();
+ if(proxyExpTime < 900){
+ log.info("Job proxy expired. Trying to renew proxy");
+ GSSCredential newgssCred = ((GSISecurityContext)executionContext.getSecurityContext()).getGssCredentails();
+ job.renew(newgssCred);
+ }
+ // job status is changed but method isn't invoked
+ if (status != 0) {
+ if (job.getStatus() != status) {
+ log.info("invoke method manually");
+ statusChanged(job);
+ } else {
+ log.info("job " + job.getIDAsString() + " have same status: " + GramJob.getStatusAsString(status));
+ }
+ } else {
+ log.info("Status is zero");
+ }
+
+ synchronized (this) {
+ wait(60 * 1000l);
+ }
+ }
+ }
+
+ public synchronized void statusChanged(GramJob job) {
+ int jobStatus = job.getStatus();
+ String jobId = job.getIDAsString();
+ String statusString = job.getStatusAsString();
+ String jobStatusMessage = GfacUtils.formatJobStatus(jobId, statusString);
+ log.info(jobStatusMessage);
+ status = jobStatus;
+ executionContext.getNotificationService().info(jobStatusMessage);
+ if (jobStatus == GramJob.STATUS_DONE) {
+ finished = true;
+ } else if (jobStatus == GramJob.STATUS_FAILED) {
+ finished = true;
+ error = job.getError();
+ log.info("Job Error Code: " + error);
+ }
+
+ if (finished) {
+ notify();
+ }
+ }
+
+
+ public int getError() {
+ return error;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void wakeup() {
+ try {
+ notify();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/RegistryService.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/RegistryService.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/RegistryService.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/RegistryService.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,46 @@
+
+package org.apache.airavata.core.gfac.registry;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.core.gfac.exception.GfacException;
+
+
+public interface RegistryService {
+
+ public void registerConcreteWsdl(String wsdlAsStr,int lifetimeAsSeconds)throws GfacException;
+ public String getConcreateWsdl(String wsdlQName)throws GfacException;
+ public void removeConcreteWsdl(String wsdlQName)throws GfacException;
+
+ public String getAbstractWsdl(String wsdlQName) throws GfacException;
+ public void removeAwsdl(String wsdlQName) throws GfacException;
+
+ public void registerServiceMap(String serviceMapAsStr,String abstractWsdlAsString)throws GfacException;
+ public void removeServiceMap(String serviceQName)throws GfacException;
+ public String getServiceMap(String serviceQName)throws GfacException;
+
+ public void registerHostDesc(String hostDescAsStr)throws GfacException;
+ public String getHostDesc(String hostName)throws GfacException;
+ public void removeHostDesc(String hostName)throws GfacException;
+
+ public void registerAppDesc(String appDescAsStr)throws GfacException;
+ public String getAppDesc(String appQName,String hostName)throws GfacException;
+ public void removeAppDesc(String appQName,String hostName)throws GfacException;
+
+ public void registerOutputFiles(QName resourceId, String resourceName, String resourceType,
+ String resourceDesc, String resourceDocument, String resourceParentTypedID,
+ String owner)throws GfacException;
+
+ public String[] findService(String serviceName)throws GfacException;
+ public String[] findServiceDesc(String serviceName)throws GfacException;
+ public String[] findAppDesc(String query) throws GfacException;
+
+ public String[] listHosts()throws GfacException;
+ public String[] listApps()throws GfacException;
+ public String[] app2Hosts(String appName)throws GfacException;
+ public String[] listAwsdl()throws GfacException;
+
+ public boolean isAuthorizedToAcsses(String resourceID, String actor, String action) throws GfacException;
+
+}
+
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/impl/XregistryServiceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/impl/XregistryServiceWrapper.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/impl/XregistryServiceWrapper.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/registry/impl/XregistryServiceWrapper.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,397 @@
+package org.apache.airavata.core.gfac.registry.impl;
+
+import java.io.File;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.registry.RegistryService;
+import org.apache.xmlbeans.impl.values.XmlValueOutOfRangeException;
+import org.globus.gsi.TrustedCertificates;
+import org.ietf.jgss.GSSCredential;
+import org.ogce.xregistry.client.XRegistryClient;
+import org.ogce.xregistry.doc.AppData;
+import org.ogce.xregistry.doc.DocData;
+import org.ogce.xregistry.utils.XRegistryClientException;
+
+import xregistry.generated.CapabilityToken;
+import xregistry.generated.FindAppDescResponseDocument;
+import xregistry.generated.HostDescData;
+import xregistry.generated.ServiceDescData;
+import xregistry.generated.WsdlData;
+
+public class XregistryServiceWrapper implements RegistryService {
+
+ private XRegistryClient xregistryClient;
+
+ public XregistryServiceWrapper(String xregistryUrl, String trustedCertFile, GSSCredential sessionCredentail) throws GfacException {
+ try {
+ if (sessionCredentail != null && trustedCertFile != null) {
+ if (new File(trustedCertFile).isFile()) {
+ this.xregistryClient = new XRegistryClient(sessionCredentail, trustedCertFile, xregistryUrl);
+ } else {
+ TrustedCertificates certificates = TrustedCertificates.load(trustedCertFile);
+ TrustedCertificates.setDefaultTrustedCertificates(certificates);
+ X509Certificate[] trustedCertificates = certificates.getCertificates();
+ System.out.println("xregistryUrl=" + xregistryUrl);
+ System.out.println("trustedCertificates=" + trustedCertificates);
+ this.xregistryClient = new XRegistryClient(sessionCredentail, trustedCertificates, xregistryUrl);
+ }
+ } else {
+ throw new GfacException("Neither host certificate of gss credential is set", FaultCode.ErrorAtDependentService);
+ }
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public XregistryServiceWrapper(String xregistryUrl, String trustedCertFile, String hostcertsKeyFile) throws GfacException {
+ try {
+ if (hostcertsKeyFile != null && trustedCertFile != null) {
+ if (new File(trustedCertFile).isFile()) {
+ this.xregistryClient = new XRegistryClient(hostcertsKeyFile, trustedCertFile, xregistryUrl);
+ } else {
+ TrustedCertificates certificates = TrustedCertificates.load(trustedCertFile);
+ TrustedCertificates.setDefaultTrustedCertificates(certificates);
+ X509Certificate[] trustedCertificates = certificates.getCertificates();
+ System.out.println("xregistryUrl=" + xregistryUrl);
+ System.out.println("hostcertsKeyFile=" + hostcertsKeyFile);
+ System.out.println("trustedCertificates=" + trustedCertificates);
+ this.xregistryClient = new XRegistryClient(hostcertsKeyFile, trustedCertificates, xregistryUrl);
+ }
+ } else {
+ throw new GfacException("Neither host certificate of gss credential is set", FaultCode.ErrorAtDependentService);
+ }
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String[] app2Hosts(String appName) throws GfacException {
+ try {
+ return xregistryClient.app2Hosts(appName);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String[] findAppDesc(String query) throws GfacException {
+ try {
+
+ FindAppDescResponseDocument.FindAppDescResponse.AppData[] xregAppDesc = xregistryClient.findAppDesc(query);
+ AppData[] appDesc = null;
+ if (xregAppDesc != null) {
+ List<AppData> appDescList = new ArrayList<AppData>();
+ for (int i = 0; i < xregAppDesc.length; i++) {
+ try {
+ FindAppDescResponseDocument.FindAppDescResponse.AppData xbeansData = xregAppDesc[i];
+ AppData resultAppData = new AppData(xbeansData.getName(), xbeansData.getOwner(), xbeansData.getHostName());
+ resultAppData.allowedAction = xbeansData.getAllowedAction();
+ resultAppData.resourceID = xbeansData.getName();
+ appDescList.add(resultAppData);
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ appDesc = appDescList.toArray(new AppData[0]);
+ } else {
+ return null;
+ }
+
+ String[] finalResults = new String[appDesc.length];
+ for (int i = 0; i < appDesc.length; i++) {
+ finalResults[i] = appDesc[i].name + "#" + appDesc[i].secondryName;
+ }
+ return finalResults;
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String[] findService(String serviceName) throws GfacException {
+ try {
+ WsdlData[] serviceInstanceData = xregistryClient.findServiceInstance(serviceName);
+ String[] results = new String[serviceInstanceData.length];
+ for (int i = 0; i < serviceInstanceData.length; i++) {
+ try {
+ results[i] = serviceInstanceData[i].getName().toString();
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ return results;
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String[] findServiceDesc(String serviceName) throws GfacException {
+ try {
+ ServiceDescData[] serviceDescData = xregistryClient.findServiceDesc(serviceName);
+ String[] results = new String[serviceDescData.length];
+ for (int i = 0; i < serviceDescData.length; i++) {
+ try {
+ results[i] = serviceDescData[i].getName().toString();
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ return results;
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String getAbstractWsdl(String wsdlQName) throws GfacException {
+ try {
+ return xregistryClient.getAbstractWsdl(QName.valueOf(wsdlQName));
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String getAppDesc(String appQName, String hostName) throws GfacException {
+ try {
+ return xregistryClient.getAppDesc(appQName, hostName);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String getConcreateWsdl(String wsdlQName) throws GfacException {
+ try {
+ return xregistryClient.getConcreateWsdl(QName.valueOf(wsdlQName));
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String getHostDesc(String hostName) throws GfacException {
+ try {
+ return xregistryClient.getHostDesc(hostName);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String getServiceMap(String serviceQName) throws GfacException {
+ try {
+ return xregistryClient.getServiceDesc(QName.valueOf(serviceQName));
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public String[] listApps() throws GfacException {
+ return findAppDesc("");
+ }
+
+ public String[] listAwsdl() throws GfacException {
+ return findServiceDesc("");
+ }
+
+ public String[] listHosts() throws GfacException {
+ try {
+ HostDescData[] hostDescData = xregistryClient.findHosts("");
+ String[] results = new String[hostDescData.length];
+ for (int i = 0; i < hostDescData.length; i++) {
+ try {
+ results[i] = hostDescData[i].getName().toString();
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ return results;
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void registerAppDesc(String appDescAsStr) throws GfacException {
+ try {
+ xregistryClient.registerAppDesc(appDescAsStr);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void registerConcreteWsdl(String wsdlAsStr, int lifetimeAsSeconds) throws GfacException {
+ try {
+ xregistryClient.registerConcreteWsdl(wsdlAsStr, lifetimeAsSeconds);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void registerOutputFiles(QName resourceId, String resourceName, String resourceType, String resourceDesc, String resourceDocument, String resourceParentTypedID, String owner) throws GfacException {
+ try {
+ xregistryClient.registerOGCEResource(resourceId, resourceName, resourceType, resourceDesc, resourceDocument, resourceParentTypedID, owner);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void registerHostDesc(String hostDescAsStr) throws GfacException {
+ try {
+ xregistryClient.registerHostDesc(hostDescAsStr);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void registerServiceMap(String serviceMapAsStr, String abstractWsdlAsString) throws GfacException {
+ try {
+ xregistryClient.registerServiceDesc(serviceMapAsStr, abstractWsdlAsString);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void removeAppDesc(String appQName, String hostName) throws GfacException {
+ try {
+ xregistryClient.removeAppDesc(QName.valueOf(appQName), hostName);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void removeAwsdl(String wsdlQName) throws GfacException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void removeConcreteWsdl(String wsdlQName) throws GfacException {
+ try {
+ xregistryClient.removeConcreteWsdl(QName.valueOf(wsdlQName));
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void removeHostDesc(String hostName) throws GfacException {
+ try {
+ xregistryClient.removeHostDesc(hostName);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void removeServiceMap(String serviceQName) throws GfacException {
+ try {
+ xregistryClient.removeServiceDesc(QName.valueOf(serviceQName));
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public boolean isAuthorizedToAcsses(String resourceID, String actor, String action) throws GfacException {
+ try {
+ return xregistryClient.isAuthorizedToAcsses(null, resourceID, actor, action);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void addCapability(String resource, String actor, boolean isUser, String action) throws GfacException {
+ try {
+ xregistryClient.addCapability(resource, actor, isUser, action);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public CapabilityToken[] getCapability(String resource, String actor, boolean isUser, String action) throws GfacException {
+ try {
+ return xregistryClient.findCapability(resource, actor, isUser, action);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public void removeCapability(String resourceID, String actor) throws GfacException {
+ try {
+ xregistryClient.removeCapability(resourceID, actor);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public AppData[] xfindAppDesc(String query) throws GfacException {
+ try {
+ xregistry.generated.FindAppDescResponseDocument.FindAppDescResponse.AppData[] xregAppDesc = xregistryClient.findAppDesc(query);
+ AppData[] appDesc = null;
+ if (xregAppDesc != null) {
+ List<AppData> appDescList = new ArrayList<AppData>();
+ for (int i = 0; i < xregAppDesc.length; i++) {
+ try {
+ xregistry.generated.FindAppDescResponseDocument.FindAppDescResponse.AppData xbeansData = xregAppDesc[i];
+ AppData resultAppData = new AppData(xbeansData.getName(), xbeansData.getOwner(), xbeansData.getHostName());
+ resultAppData.allowedAction = xbeansData.getAllowedAction();
+ resultAppData.resourceID = xbeansData.getName();
+ appDescList.add(resultAppData);
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ appDesc = appDescList.toArray(new AppData[0]);
+ } else {
+ return null;
+ }
+ return appDesc;
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public DocData[] xfindHostDesc(String query) throws GfacException {
+ try {
+ HostDescData[] hostDescData = xregistryClient.findHosts(query);
+ if (hostDescData == null) {
+ return null;
+ }
+
+ List<DocData> results = new ArrayList<DocData>();
+ for (int i = 0; i < hostDescData.length; i++) {
+ try {
+ HostDescData host = hostDescData[i];
+ DocData data = new DocData(new QName(host.getResourceID()), host.getOwner());
+ data.allowedAction = host.getAllowedAction();
+ data.resourceID = new QName(host.getResourceID());
+ results.add(data);
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ return results.toArray(new DocData[0]);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+ public DocData[] xfindServiceDesc(String query) throws GfacException {
+ try {
+ ServiceDescData[] serviceDescData = xregistryClient.findServiceDesc(query);
+ if (serviceDescData == null) {
+ return null;
+ }
+ List<DocData> results = new ArrayList<DocData>();
+ for (int i = 0; i < serviceDescData.length; i++) {
+ try {
+ DocData data = new DocData(serviceDescData[i].getName(), serviceDescData[i].getOwner());
+ data.allowedAction = serviceDescData[i].getAllowedAction();
+ data.resourceID = serviceDescData[i].getName();
+ results.add(data);
+ } catch (XmlValueOutOfRangeException e) {
+ throw new GfacException("Problem with retrieving object : " + e.getLocalizedMessage(), FaultCode.ErrorAtDependentService);
+ }
+ }
+ return results.toArray(new DocData[0]);
+ } catch (XRegistryClientException e) {
+ throw new GfacException(e, FaultCode.ErrorAtDependentService);
+ }
+ }
+
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/Scheduler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/Scheduler.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/Scheduler.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/Scheduler.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,9 @@
+package org.apache.airavata.core.gfac.scheduler;
+
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.provider.Provider;
+
+public interface Scheduler {
+ public Provider schedule(InvocationContext context) throws GfacException;
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/impl/SchedulerImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/impl/SchedulerImpl.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/impl/SchedulerImpl.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/scheduler/impl/SchedulerImpl.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,83 @@
+package org.apache.airavata.core.gfac.scheduler.impl;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.provider.GramProvider;
+import org.apache.airavata.core.gfac.provider.LocalProvider;
+import org.apache.airavata.core.gfac.provider.Provider;
+import org.apache.airavata.core.gfac.registry.RegistryService;
+import org.apache.airavata.core.gfac.scheduler.Scheduler;
+import org.apache.airavata.core.gfac.utils.GfacUtils;
+import org.apache.xmlbeans.XmlException;
+import org.ogce.schemas.gfac.documents.ServiceMapDocument;
+import org.ogce.schemas.gfac.documents.ServiceMapType;
+
+public class SchedulerImpl implements Scheduler {
+
+ public Provider schedule(InvocationContext context) throws GfacException {
+
+ String hostName = null;
+
+ /*
+ * Load host and app description from registry
+ */
+ RegistryService registryService = context.getExecutionContext().getRegistryService();
+ String serviceMapStr = registryService.getServiceMap(context.getServiceName());
+
+ if (serviceMapStr != null) {
+ try {
+
+ ServiceMapType serviceMap = ServiceMapDocument.Factory.parse(serviceMapStr).getServiceMap();
+ QName appName = GfacUtils.findApplcationName(serviceMap);
+
+ // host name
+ hostName = findHostFromServiceMap(registryService, appName);
+
+ } catch (XmlException e) {
+ throw new GfacException(e, FaultCode.InitalizationError);
+ }
+ } else {
+ throw new GfacException("Service Map for " + context.getServiceName() + " does not found on resource Catalog " + registryService, FaultCode.InvalidRequest);
+ }
+
+
+ /*
+ * Determine provider
+ */
+ if (GfacUtils.isLocalHost(hostName)) {
+ return new LocalProvider();
+ } else {
+ // set Security context for executionContext
+ if (context.getSecurityContext(GramProvider.MYPROXY_SECURITY_CONTEXT) != null) {
+ context.getExecutionContext().setSecurityContext(context.getSecurityContext(GramProvider.MYPROXY_SECURITY_CONTEXT));
+ } else {
+ throw new GfacException("Cannot get security context to run on Gram", FaultCode.InvalidRequest);
+ }
+
+ return new GramProvider();
+ }
+ }
+
+ private String findHostFromServiceMap(RegistryService regService, QName appName) throws GfacException {
+
+ System.out.println("Searching registry for some deployed application hosts\n");
+ String[] hosts = regService.app2Hosts(appName.toString());
+ if (hosts.length > 1) {
+ String hostNames = "";
+ for (int i = 0; i < hosts.length; i++) {
+ hostNames = hostNames + hosts[i];
+ }
+ System.out.println("Application deployed on more than one machine. The full Host list is " + hostNames + "\n");
+ }
+ if (hosts.length >= 1) {
+ System.out.println("Found Host = " + hosts[0]);
+ return hosts[0];
+ } else {
+ System.out.println("Applcation " + appName.getLocalPart() + " not found in registry");
+ return null;
+ }
+ }
+}
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/GenericService.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/GenericService.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/GenericService.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/GenericService.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,13 @@
+package org.apache.airavata.core.gfac.services;
+
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+
+public interface GenericService {
+
+ public void init() throws GfacException;
+
+ public void execute(InvocationContext context) throws GfacException;
+
+ public void dispose() throws GfacException;
+}
\ No newline at end of file
Added: incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/impl/AbstractSimpleService.java
URL: http://svn.apache.org/viewvc/incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/impl/AbstractSimpleService.java?rev=1139241&view=auto
==============================================================================
--- incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/impl/AbstractSimpleService.java (added)
+++ incubator/airavata/core/trunk/gfac/src/main/java/org/apache/airavata/core/gfac/services/impl/AbstractSimpleService.java Fri Jun 24 11:01:15 2011
@@ -0,0 +1,95 @@
+package org.apache.airavata.core.gfac.services.impl;
+
+import org.apache.airavata.core.gfac.context.InvocationContext;
+import org.apache.airavata.core.gfac.exception.GfacException;
+import org.apache.airavata.core.gfac.extension.DataServiceChain;
+import org.apache.airavata.core.gfac.extension.ExitableChain;
+import org.apache.airavata.core.gfac.extension.PostExecuteChain;
+import org.apache.airavata.core.gfac.extension.PreExecuteChain;
+import org.apache.airavata.core.gfac.provider.Provider;
+import org.apache.airavata.core.gfac.scheduler.Scheduler;
+import org.apache.airavata.core.gfac.services.GenericService;
+
+public abstract class AbstractSimpleService implements GenericService {
+
+ public abstract void preProcess(InvocationContext context) throws GfacException;
+
+ public abstract void postProcess(InvocationContext context) throws GfacException;
+
+ public abstract Scheduler getScheduler(InvocationContext context) throws GfacException;
+
+ public abstract PreExecuteChain[] getPreExecutionSteps(InvocationContext context) throws GfacException;
+
+ public abstract PostExecuteChain[] getPostExecuteSteps(InvocationContext context) throws GfacException;
+
+ public abstract DataServiceChain[] getDataChains(InvocationContext context) throws GfacException;
+
+ public final void execute(InvocationContext context) throws GfacException {
+
+ /*
+ * Pre-Process
+ */
+ preProcess(context);
+
+ /*
+ * Determine provider
+ */
+ Provider provider = getScheduler(context).schedule(context);
+
+ /*
+ * Load data necessary data
+ */
+ buildChains(getDataChains(context)).start(context);
+
+ /*
+ * Init
+ */
+ provider.initialize(context);
+
+ /*
+ * Pre-Execution
+ */
+ buildChains(getPreExecutionSteps(context)).start(context);
+
+ /*
+ * Execute
+ */
+ provider.execute(context);
+
+ /*
+ * Post-Execution
+ */
+ buildChains(getPostExecuteSteps(context)).start(context);
+
+ /*
+ * Destroy
+ */
+ provider.dispose(context);
+
+ /*
+ * Pre-Process
+ */
+ postProcess(context);
+ }
+
+ private ExitableChain buildChains(ExitableChain[] list) {
+
+ /*
+ * Validation check and return doing-nothing chain object
+ */
+ if (list == null || list.length == 0) {
+ return new ExitableChain() {
+ @Override
+ protected boolean execute(InvocationContext context) {
+ return true;
+ }
+ };
+ }
+
+ ExitableChain currentPoint = list[0];
+ for (int i = 1; i < list.length; i++) {
+ currentPoint = currentPoint.setNext(list[i]);
+ }
+ return currentPoint;
+ }
+}