You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/06/06 17:38:32 UTC

airavata git commit: Yaml implementation for sub types in data staging

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt ead87d8fe -> f37895ca5


Yaml implementation for sub types in data staging


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f37895ca
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f37895ca
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f37895ca

Branch: refs/heads/feature-workload-mgmt
Commit: f37895ca57e46d23e76b292eb796b171e64059f3
Parents: ead87d8
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Tue Jun 6 13:38:23 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Tue Jun 6 13:38:23 2017 -0400

----------------------------------------------------------------------
 .../config/DataStagingConfigException.java      | 31 +++++++++
 .../config/DataStagingYamlConfig.java           | 68 ++++++++++++++++++++
 .../config/DataTransferTaskConfig.java          | 60 +++++++++++++++++
 .../datastaging/facade/DataStagingFacade.java   | 34 ++++++++++
 .../datastaging/utils/DataStagingFactory.java   | 34 ++++++++++
 .../src/main/resources/data-staging-config.yaml | 27 ++++++++
 6 files changed, 254 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingConfigException.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingConfigException.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingConfigException.java
new file mode 100644
index 0000000..a0b7ea7
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingConfigException.java
@@ -0,0 +1,31 @@
+package org.apache.airavata.worker.task.datastaging.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 6/1/17.
+ */
+public class DataStagingConfigException extends Exception {
+
+    private static final Logger log = LoggerFactory.getLogger(DataStagingConfigException.class);
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    public DataStagingConfigException(String s) {
+        super(s);
+    }
+
+    public DataStagingConfigException(Exception e) {
+        super(e);
+        log.error(e.getMessage(),e);
+    }
+
+    public DataStagingConfigException(String s, Throwable throwable) {
+        super(s, throwable);
+        log.error(s,throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingYamlConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingYamlConfig.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingYamlConfig.java
new file mode 100644
index 0000000..3db3144
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataStagingYamlConfig.java
@@ -0,0 +1,68 @@
+package org.apache.airavata.worker.task.datastaging.config;
+
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 6/1/17.
+ */
+public class DataStagingYamlConfig {
+
+    private static final String TASK_CLASS = "taskClass";
+    private static final String FILE_TRANSFER_TASKS = "fileTransferTasks";
+    private static final String TRANSFER_PROTOCOL = "transferProtocol";
+    private static final String PROPERTIES = "properties";
+    private List<DataTransferTaskConfig> fileTransferTasks = new ArrayList<>();
+
+    public DataStagingYamlConfig() throws DataStagingConfigException {
+        InputStream resourceAsStream = DataStagingYamlConfig.class.getClassLoader().
+                getResourceAsStream("data-staging-config.yaml");
+        parse(resourceAsStream);
+    }
+
+    private void parse(InputStream resourceAsStream) throws DataStagingConfigException {
+        if (resourceAsStream == null) {
+            throw new DataStagingConfigException("Configuration file{data-staging-config.yaml} is not fund");
+        }
+        Yaml yaml = new Yaml();
+        Object load = yaml.load(resourceAsStream);
+        if (load == null) {
+            throw new DataStagingConfigException("Yaml configuration object null");
+        }
+
+        if (load instanceof Map) {
+            Map<String, Object> loadMap = (Map<String, Object>) load;
+            String identifier;
+
+            List<Map<String, Object>> fileTransYamls = (List<Map<String, Object>>) loadMap.get(FILE_TRANSFER_TASKS);
+            DataTransferTaskConfig dataTransferTaskConfig;
+            if (fileTransYamls != null) {
+                for (Map<String, Object> fileTransConfig : fileTransYamls) {
+                    dataTransferTaskConfig = new DataTransferTaskConfig();
+                    identifier = ((String) fileTransConfig.get(TRANSFER_PROTOCOL));
+                    dataTransferTaskConfig.setTransferProtocol(DataMovementProtocol.valueOf(identifier));
+                    dataTransferTaskConfig.setTaskClass(((String) fileTransConfig.get(TASK_CLASS)));
+                    Object propertiesObj = fileTransConfig.get(PROPERTIES);
+                    List propertiesList;
+                    if (propertiesObj instanceof List) {
+                        propertiesList = (List) propertiesObj;
+                        if (propertiesList.size() > 0) {
+                            Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+                            dataTransferTaskConfig.addProperties(props);
+                        }
+                    }
+                    fileTransferTasks.add(dataTransferTaskConfig);
+                }
+            }
+        }
+    }
+
+    public List<DataTransferTaskConfig> getFileTransferTasks() {
+        return fileTransferTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataTransferTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataTransferTaskConfig.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataTransferTaskConfig.java
new file mode 100644
index 0000000..9a238e8
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/config/DataTransferTaskConfig.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.task.datastaging.config;
+
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataTransferTaskConfig {
+	private DataMovementProtocol transferProtocol;
+	private String taskClass;
+	private Map<String,String> properties = new HashMap<>();
+
+
+	public DataMovementProtocol getTransferProtocol() {
+		return transferProtocol;
+	}
+
+	public void setTransferProtocol(DataMovementProtocol transferProtocol) {
+		this.transferProtocol = transferProtocol;
+	}
+
+	public String getTaskClass() {
+		return taskClass;
+	}
+
+	public void setTaskClass(String taskClass) {
+		this.taskClass = taskClass;
+	}
+
+	public void addProperty(String key, String value) {
+		properties.put(key, value);
+	}
+
+	public void addProperties(Map<String, String> propMap) {
+		propMap.forEach(properties::put);
+	}
+
+	public Map<String,String> getProperties(){
+		return properties;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/facade/DataStagingFacade.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/facade/DataStagingFacade.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/facade/DataStagingFacade.java
new file mode 100644
index 0000000..a8889ad
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/facade/DataStagingFacade.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.worker.task.datastaging.facade;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 6/1/17.
+ */
+public class DataStagingFacade implements Task {
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        return null;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return null;
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
index 51996b8..f1a0df8 100644
--- a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
@@ -2,13 +2,25 @@ package org.apache.airavata.worker.task.datastaging.utils;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.config.TaskImplementationConfig;
+import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
 import org.apache.airavata.worker.core.context.ProcessContext;
 import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
 import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.task.datastaging.config.DataStagingConfigException;
+import org.apache.airavata.worker.task.datastaging.config.DataStagingYamlConfig;
+import org.apache.airavata.worker.task.datastaging.config.DataTransferTaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Created by Ajinkya on 4/13/17.
  */
@@ -16,6 +28,28 @@ public class DataStagingFactory {
 
     private static final Logger log = LoggerFactory.getLogger(DataStagingFactory.class);
 
+    private static boolean isDataStagingConfigurationLoaded = false;
+    private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
+
+    public static void loadConfiguration() throws DataStagingConfigException {
+        if (!isDataStagingConfigurationLoaded) {
+            DataStagingYamlConfig config = new DataStagingYamlConfig();
+            try {
+                for (DataTransferTaskConfig dataTransferTaskConfig : config.getFileTransferTasks()) {
+                    String taskClass = dataTransferTaskConfig.getTaskClass();
+                    Class<?> aClass = Class.forName(taskClass);
+                    Constructor<?> constructor = aClass.getConstructor();
+                    Task task = (Task) constructor.newInstance();
+                    task.init(dataTransferTaskConfig.getProperties());
+                    dataMovementTask.put(dataTransferTaskConfig.getTransferProtocol(), task);
+                }
+            }catch (Exception e) {
+                throw new DataStagingConfigException("Data staging config issue", e);
+            }
+            isDataStagingConfigurationLoaded =  true;
+        }
+    }
+
     public static SSHKeyAuthentication getComputerResourceSSHKeyAuthentication(ProcessContext pc)
             throws WorkerException, CredentialStoreException {
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37895ca/modules/worker/task-datastaging/src/main/resources/data-staging-config.yaml
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/resources/data-staging-config.yaml b/modules/worker/task-datastaging/src/main/resources/data-staging-config.yaml
new file mode 100644
index 0000000..5d6f805
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/resources/data-staging-config.yaml
@@ -0,0 +1,27 @@
+##################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+################################################################
+
+fileTransferTasks:
+  - transferProtocol: SCP
+    taskClass: org.apache.airavata.worker.task.datastaging.impl.SCPDataStageTask
+  - transferProtocol: LOCAL
+    taskClass: org.apache.airavata.worker.task.datastaging.impl.SCPDataStageTask
+
+