You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/10 20:57:16 UTC
airavata git commit: initial stage changes on data streaming
Repository: airavata
Updated Branches:
refs/heads/master b00b8b23f -> dd32d065e
initial stage changes on data streaming
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dd32d065
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dd32d065
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dd32d065
Branch: refs/heads/master
Commit: dd32d065ea8d9bfedf80cdf48db1a2704dbcafb2
Parents: b00b8b2
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Nov 10 14:57:04 2015 -0500
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Nov 10 14:57:04 2015 -0500
----------------------------------------------------------------------
.../gfac/impl/task/DataStreamingTask.java | 113 +++++++++++
.../gfac/impl/task/utils/StreamData.java | 200 +++++++++++++++++++
2 files changed, 313 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/dd32d065/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
new file mode 100644
index 0000000..da29a26
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStreamingTask.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.utils.StreamData;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Timer;
+
+public class DataStreamingTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(DataStreamingTask.class);
+ private String userName;
+ private String hostName;
+ private String inputPath;
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ inputPath = propertyMap.get("inputPath");
+ hostName = propertyMap.get("hostName");
+ userName = propertyMap.get("userName");
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ try {
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ final DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+ (taskContext.getTaskModel());
+ if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ OutputDataObjectType processOutput = taskContext.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processOutput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processOutput.isIsRequired()) {
+ status.setReason("File name is null, but this output's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ if (processOutput != null) {
+ if (processOutput.isOutputStreaming()) {
+ // stream output periodically
+ ComputationalResourceSchedulingModel resourceSchedule = taskContext.getParentProcessContext().getProcessModel().getResourceSchedule();
+ int wallTimeLimit = resourceSchedule.getWallTimeLimit();
+ if (wallTimeLimit > 10) {
+ int period = wallTimeLimit / 10;
+ Timer timer = new Timer();
+ StreamData streamData = new StreamData(userName, hostName, inputPath, taskContext, subTaskModel);
+ timer.schedule(streamData, 0, 1000 * 60 * period);
+ status.setState(TaskState.COMPLETED);
+ }
+ }
+ }
+
+ }
+ return null;
+ } catch (TException e) {
+ log.error("Error while creating data streaming task", e);
+ return null;
+ }
+ }
+
+
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return null;
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return null;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dd32d065/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
new file mode 100644
index 0000000..fe5f8b7
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task.utils;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.TimerTask;
+
+public class StreamData extends TimerTask {
+ private static final Logger log = LoggerFactory.getLogger(StreamData.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String publicKeyPath;
+ private String passPhrase;
+ private String privateKeyPath;
+ private String userName;
+ private String hostName;
+ private String inputPath;
+ private TaskContext taskContext;
+ private DataStagingTaskModel subTaskModel;
+
+ public StreamData(String userName, String hostName, String inputPath, TaskContext taskContext, DataStagingTaskModel subTaskModel) {
+ this.userName = userName;
+ this.hostName = hostName;
+ this.inputPath = inputPath;
+ this.taskContext = taskContext;
+ this.subTaskModel = subTaskModel;
+ }
+
+ @Override
+ public void run() {
+ try {
+ runOutputStaging();
+ } catch (URISyntaxException e) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous path specified",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ taskContext.getProcessOutput().getName());
+ } catch (IllegalAccessException | InstantiationException | AiravataException | IOException | JSchException | SSHApiException e) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while streaming data",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ taskContext.getProcessOutput().getName());
+ } catch (CredentialStoreException e) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while connecting with credential store",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ taskContext.getProcessOutput().getName());
+ }
+ }
+
+ public void runOutputStaging() throws URISyntaxException, IllegalAccessException, InstantiationException, CredentialStoreException, AiravataException, IOException, JSchException, SSHApiException {
+ try {
+
+ AuthenticationInfo authenticationInfo = null;
+ URI sourceURI = new URI(subTaskModel.getSource());
+ String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+ URI destinationURI = null;
+ if (subTaskModel.getDestination().startsWith("dummy")) {
+ destinationURI = getDestinationURI(taskContext, fileName);
+ subTaskModel.setDestination(destinationURI.toString());
+ } else {
+ destinationURI = new URI(subTaskModel.getDestination());
+ }
+
+ if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+ && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+ localDataCopy(taskContext, sourceURI, destinationURI);
+ }
+
+ String tokenId = taskContext.getParentProcessContext().getTokenId();
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
+ if (credential instanceof SSHCredential) {
+ SSHCredential sshCredential = (SSHCredential) credential;
+ byte[] publicKey = sshCredential.getPublicKey();
+ publicKeyPath = writeFileToDisk(publicKey);
+ byte[] privateKey = sshCredential.getPrivateKey();
+ privateKeyPath = writeFileToDisk(privateKey);
+ passPhrase = sshCredential.getPassphrase();
+ authenticationInfo = getSSHKeyAuthentication();
+ } else {
+ String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(msg);
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+
+ ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
+ SSHUtils.makeDirectory(targetPath, sshSession);
+ // TODO - save updated subtask model with new destination
+ outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ } catch (GFacException e) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error while output staging",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ taskContext.getProcessOutput().getName());
+ throw new AiravataException("Error while output staging",e);
+ }
+ }
+
+ private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
+ StringBuilder sb = new StringBuilder("rsync -cr ");
+ sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+ CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+ taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+ }
+
+
+ public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+ String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+ taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ return new URI("SCP", hostName, filePath, null);
+
+ }
+
+ private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+ throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+
+ /**
+ * scp third party file transfer 'from' comute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
+ // update output locations
+ GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+ GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+
+ }
+
+ private String writeFileToDisk(byte[] data) {
+ File temp = null;
+ try {
+ temp = File.createTempFile("id_rsa", "");
+ //write it
+ FileOutputStream bw = new FileOutputStream(temp);
+ bw.write(data);
+ bw.close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ return temp.getAbsolutePath();
+ }
+
+ private SSHKeyAuthentication getSSHKeyAuthentication() {
+ SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+ sshKA.setUserName(userName);
+ sshKA.setPassphrase(passPhrase);
+ sshKA.setPrivateKeyFilePath(privateKeyPath);
+ sshKA.setPublicKeyFilePath(publicKeyPath);
+ sshKA.setStrictHostKeyChecking("no");
+ return sshKA;
+ }
+}