You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/10 20:57:16 UTC

airavata git commit: initial stage changes on data streaming

Repository: airavata
Updated Branches:
  refs/heads/master b00b8b23f -> dd32d065e


initial stage changes on data streaming


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dd32d065
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dd32d065
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dd32d065

Branch: refs/heads/master
Commit: dd32d065ea8d9bfedf80cdf48db1a2704dbcafb2
Parents: b00b8b2
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Nov 10 14:57:04 2015 -0500
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Nov 10 14:57:04 2015 -0500

----------------------------------------------------------------------
 .../gfac/impl/task/DataStreamingTask.java       | 113 +++++++++++
 .../gfac/impl/task/utils/StreamData.java        | 200 +++++++++++++++++++
 2 files changed, 313 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dd32d065/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
new file mode 100644
index 0000000..da29a26
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.utils.StreamData;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Timer;
+
+public class DataStreamingTask implements Task {
+    private static final Logger log = LoggerFactory.getLogger(DataStreamingTask.class);
+    private String userName;
+    private String hostName;
+    private String inputPath;
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+        inputPath = propertyMap.get("inputPath");
+        hostName = propertyMap.get("hostName");
+        userName = propertyMap.get("userName");
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+        try {
+            TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+            final DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+                    (taskContext.getTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+                if (processOutput != null) {
+                    if (processOutput.isOutputStreaming()) {
+                        // stream output periodically
+                        ComputationalResourceSchedulingModel resourceSchedule = taskContext.getParentProcessContext().getProcessModel().getResourceSchedule();
+                        int wallTimeLimit = resourceSchedule.getWallTimeLimit();
+                        if (wallTimeLimit > 10) {
+                            int period = wallTimeLimit / 10;
+                            Timer timer = new Timer();
+                            StreamData streamData = new StreamData(userName, hostName, inputPath, taskContext, subTaskModel);
+                            timer.schedule(streamData, 0, 1000 * 60 * period);
+                            status.setState(TaskState.COMPLETED);
+                        }
+                    }
+                }
+
+            }
+            return null;
+        } catch (TException e) {
+            log.error("Error while creating data streaming task", e);
+            return null;
+        }
+    }
+
+
+
+        @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return null;
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return null;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dd32d065/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
new file mode 100644
index 0000000..fe5f8b7
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task.utils;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.TimerTask;
+
+public class StreamData extends TimerTask  {
+    private static final Logger log = LoggerFactory.getLogger(StreamData.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String publicKeyPath;
+    private String passPhrase;
+    private String privateKeyPath;
+    private String userName;
+    private String hostName;
+    private String inputPath;
+    private TaskContext taskContext;
+    private DataStagingTaskModel subTaskModel;
+
+    public StreamData(String userName, String hostName, String inputPath, TaskContext taskContext, DataStagingTaskModel subTaskModel) {
+        this.userName = userName;
+        this.hostName = hostName;
+        this.inputPath = inputPath;
+        this.taskContext = taskContext;
+        this.subTaskModel = subTaskModel;
+    }
+
+    @Override
+    public void run() {
+        try {
+            runOutputStaging();
+        } catch (URISyntaxException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous path specified",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
+        } catch (IllegalAccessException | InstantiationException | AiravataException | IOException | JSchException | SSHApiException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while streaming data",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
+        } catch (CredentialStoreException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while connecting with credential store",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
+        }
+    }
+
+    public void runOutputStaging() throws URISyntaxException, IllegalAccessException, InstantiationException, CredentialStoreException, AiravataException, IOException, JSchException, SSHApiException {
+        try {
+
+            AuthenticationInfo authenticationInfo = null;
+            URI sourceURI = new URI(subTaskModel.getSource());
+            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+            URI destinationURI = null;
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = getDestinationURI(taskContext, fileName);
+                subTaskModel.setDestination(destinationURI.toString());
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
+
+            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+                    && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+                localDataCopy(taskContext, sourceURI, destinationURI);
+            }
+
+            String tokenId = taskContext.getParentProcessContext().getTokenId();
+            CredentialReader credentialReader = GFacUtils.getCredentialReader();
+            Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = (SSHCredential) credential;
+                byte[] publicKey = sshCredential.getPublicKey();
+                publicKeyPath = writeFileToDisk(publicKey);
+                byte[] privateKey = sshCredential.getPrivateKey();
+                privateKeyPath = writeFileToDisk(privateKey);
+                passPhrase = sshCredential.getPassphrase();
+                authenticationInfo = getSSHKeyAuthentication();
+            } else {
+                String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+                log.error(msg);
+                ErrorModel errorModel = new ErrorModel();
+                errorModel.setActualErrorMessage(msg);
+                errorModel.setUserFriendlyMessage(msg);
+                taskContext.getTaskModel().setTaskError(errorModel);
+            }
+
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
+            SSHUtils.makeDirectory(targetPath, sshSession);
+            // TODO - save updated subtask model with new destination
+            outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+        } catch (GFacException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error while output staging",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
+            throw new AiravataException("Error while output staging",e);
+        }
+    }
+
+    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
+        StringBuilder sb = new StringBuilder("rsync -cr ");
+        sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+        taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+    }
+
+
+    public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+        String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+                taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        return new URI("SCP", hostName, filePath, null);
+
+    }
+
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+            throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+
+        /**
+         * scp third party file transfer 'from' comute resource.
+         */
+        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
+        // update output locations
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+
+    }
+
+    private String writeFileToDisk(byte[] data) {
+        File temp = null;
+        try {
+            temp = File.createTempFile("id_rsa", "");
+            //write it
+            FileOutputStream bw = new FileOutputStream(temp);
+            bw.write(data);
+            bw.close();
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+        return temp.getAbsolutePath();
+    }
+
+    private SSHKeyAuthentication getSSHKeyAuthentication() {
+        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+        sshKA.setUserName(userName);
+        sshKA.setPassphrase(passPhrase);
+        sshKA.setPrivateKeyFilePath(privateKeyPath);
+        sshKA.setPublicKeyFilePath(publicKeyPath);
+        sshKA.setStrictHostKeyChecking("no");
+        return sshKA;
+    }
+}