You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 16:44:28 UTC
[4/8] changing package names of gfac implementations
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
new file mode 100644
index 0000000..2c4c956
--- /dev/null
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.schemas.gfac.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class GFACGSISSHUtils {
+ private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
+
+ public static final String PBS_JOB_MANAGER = "pbs";
+ public static final String SLURM_JOB_MANAGER = "slurm";
+ public static final String SUN_GRID_ENGINE_JOB_MANAGER = "sge";
+
+ public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+ RequestData requestData = new RequestData("default");
+ GSISecurityContext context = null;
+ try {
+ //todo fix this
+ context = new GSISecurityContext(null, requestData);
+ } catch (Exception e) {
+ throw new GFacException("An error occurred while creating GSI security context", e);
+ }
+ HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
+ if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType
+ || registeredHost.getType() instanceof SSHHostType) {
+ logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+ } else if (registeredHost.getType() instanceof GsisshHostType) {
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(requestData.getMyProxyUserName(), requestData.getMyProxyPassword(), requestData.getMyProxyServerUrl(),
+ requestData.getMyProxyPort(), requestData.getMyProxyLifeTime(), System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY));
+ GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType();
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(),
+ gsisshHostType.getPort());
+
+ Cluster pbsCluster = null;
+ try {
+ JobManagerConfiguration jConfig = null;
+ String installedParentPath = ((HpcApplicationDeploymentType)
+ jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
+ String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
+ if (jobManager == null) {
+ logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else {
+ if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSGEJobManager(installedParentPath);
+ }
+ }
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ context.setPbsCluster(pbsCluster);
+ }
+ jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT,context);
+ }
+ public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext,
+ ApplicationDeploymentDescriptionType app, Cluster cluster) {
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ // this is common for any application descriptor
+ jobDescriptor.setInputDirectory(app.getInputDataDirectory());
+ jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
+ jobDescriptor.setExecutablePath(app.getExecutableLocation());
+ jobDescriptor.setStandardOutFile(app.getStandardOutput());
+ jobDescriptor.setStandardErrorFile(app.getStandardError());
+ Random random = new Random();
+ int i = random.nextInt();
+ jobDescriptor.setJobName(app.getApplicationName().getStringValue() + String.valueOf(i));
+ jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
+
+
+ List<String> inputValues = new ArrayList<String>();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString())
+ || "FileArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = null;
+ if (actualParameter.getType() instanceof URIArrayType) {
+ values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ } else if (actualParameter.getType() instanceof StringArrayType) {
+ values = ((StringArrayType) actualParameter.getType()).getValueArray();
+ } else if (actualParameter.getType() instanceof FileArrayType) {
+ values = ((FileArrayType) actualParameter.getType()).getValueArray();
+ }
+ String value = StringUtil.createDelimiteredString(values, " ");
+ inputValues.add(value);
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ inputValues.add(paramValue);
+ }
+ }
+ jobDescriptor.setInputValues(inputValues);
+
+ // this part will fill out the hpcApplicationDescriptor
+ if (app instanceof HpcApplicationDeploymentType) {
+ HpcApplicationDeploymentType applicationDeploymentType
+ = (HpcApplicationDeploymentType) app;
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setNodes(applicationDeploymentType.getNodeCount());
+ jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode());
+ jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime()));
+ jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+ if (applicationDeploymentType.getProjectAccount() != null) {
+ if (applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != null) {
+ jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber());
+ }
+ }
+ if (applicationDeploymentType.getQueue() != null) {
+ if (applicationDeploymentType.getQueue().getQueueName() != null) {
+ jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName());
+ }
+ }
+ jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ if (taskData != null && taskData.isSetTaskScheduling()) {
+ ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling();
+ if (computionnalResource.getNodeCount() > 0) {
+ jobDescriptor.setNodes(computionnalResource.getNodeCount());
+ }
+ if (computionnalResource.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
+ }
+ if (computionnalResource.getQueueName() != null) {
+ jobDescriptor.setQueueName(computionnalResource.getQueueName());
+ }
+ if (computionnalResource.getTotalCPUCount() > 0) {
+ jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
+ }
+ if (computionnalResource.getWallTimeLimit() > 0) {
+ jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
+ }
+ }
+
+ }
+ return jobDescriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
deleted file mode 100644
index 1b9741d..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHDirectorySetupHandler.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class GSISSHDirectorySetupHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
- }
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- Cluster cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- if (cluster == null) {
- throw new GFacHandlerException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-
- String workingDirectory = app.getScratchWorkingDirectory();
- cluster.makeDirectory(workingDirectory);
- cluster.makeDirectory(app.getScratchWorkingDirectory());
- cluster.makeDirectory(app.getInputDataDirectory());
- cluster.makeDirectory(app.getOutputDataDirectory());
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.DIRECTORY_SETUP);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Working directory = " + workingDirectory);
-
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e);
- } catch (Exception e) {
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e);
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
deleted file mode 100644
index 6f4fafd..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHInputHandler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-public class GSISSHInputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
-
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- MessageContext inputNew = new MessageContext();
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- try {
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
- try {
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- newFiles.add(stageInputFiles);
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- try {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- } else {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- int i = paramValue.lastIndexOf(File.separator);
- String substring = paramValue.substring(i + 1);
- try {
- String targetFile = app.getInputDataDirectory() + File.separator + substring;
- if (paramValue.startsWith("file")) {
- paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- }
- cluster.scpTo(targetFile, paramValue);
- return targetFile;
- } catch (SSHApiException e) {
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
deleted file mode 100644
index 3855248..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/handler/GSISSHOutputHandler.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.handler;
-
-import net.schmizz.sshj.connection.ConnectionException;
-import net.schmizz.sshj.transport.TransportException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.ApplicationDescription;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.*;
-import org.apache.airavata.registry.cpi.ChildDataType;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.URIParameterType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class GSISSHOutputHandler extends AbstractHandler{
- private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class);
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context
- // so attempting to get it from the registry
- if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) {
- log.warn("During the out handler chain jobExecution context came null, so trying to handler");
- ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
- TaskDetails taskData = null;
- try {
- taskData = (TaskDetails) jobExecutionContext.getRegistry().get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
- } catch (RegistryException e) {
- log.error("Error retrieving job details from Registry");
- throw new GFacHandlerException("Error retrieving job details from Registry", e);
- }
- JobDetails jobDetails = taskData.getJobDetailsList().get(0);
- String jobDescription = jobDetails.getJobDescription();
- if (jobDescription != null) {
- JobDescriptor jobDescriptor = null;
- try {
- jobDescriptor = JobDescriptor.fromXML(jobDescription);
- } catch (XmlException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- applicationDeploymentDescription.getType().setScratchWorkingDirectory(
- jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
- applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
- applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
- applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
- applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
- }
- }
- }
- try {
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
-
- GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
- }
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- } catch (GFacException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- super.invoke(jobExecutionContext);
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
-
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- } else {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
-
- // Get the Stdouts and StdErrs
- String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
-
- TaskDetails taskData = jobExecutionContext.getTaskData();
- String outputDataDir = null;
- File localStdOutFile;
- File localStdErrFile;
-
- if (taskData.getAdvancedOutputDataHandling() != null) {
- outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
- }
- if (outputDataDir == null) {
- outputDataDir = File.separator + "tmp";
- }
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
- (new File(outputDataDir)).mkdirs();
-
-
- localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout");
- localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr");
-// cluster.makeDirectory(outputDataDir);
- cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
- Thread.sleep(1000);
- cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
- Thread.sleep(1000);
-
- String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDOUT:" + stdOutStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- status.setTransferState(TransferState.COMPLETE);
- detail.setTransferStatus(status);
- detail.setTransferDescription("STDERR:" + stdErrStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
-
- Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) output.get(paramName);
- if ("URI".equals(actualParameter.getType().getType().toString())) {
-
- List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
- if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- } else {
- String valueList = outputList.get(0);
- cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
- jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
- ((URIParameterType) actualParameter.getType()).setValue(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- }
- } else {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- }
- }
- if (stringMap == null || stringMap.isEmpty()) {
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names");
- }
- status.setTransferState(TransferState.DOWNLOAD);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- app.setStandardError(localStdErrFile.getAbsolutePath());
- app.setStandardOutput(localStdOutFile.getAbsolutePath());
- app.setOutputDataDirectory(outputDataDir);
- } catch (XmlException e) {
- throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
- } catch (ConnectionException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (TransportException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (IOException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error in retrieving results", e);
- }
-
- }
-
- public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
deleted file mode 100644
index da5b330..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.provider.impl;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.cpi.GFacImpl;
-import org.apache.airavata.gfac.handler.ThreadedHandler;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
-import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class GSISSHProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.info("Invoking GSISSH Provider Invoke ...");
- jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
- JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
- try {
- Cluster cluster = null;
- if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
- cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
- }
- if (cluster == null) {
- throw new GFacProviderException("Security context is not set properly");
- } else {
- log.info("Successfully retrieved the Security Context");
- }
- // This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
- log.info(jobDescriptor.toXML());
-
- jobDetails.setJobDescription(jobDescriptor.toXML());
-
- String jobID = cluster.submitBatchJob(jobDescriptor);
- jobExecutionContext.setJobDetails(jobDetails);
- if(jobID == null){
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- }else{
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
- }
-
-
- // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
- // to perform monitoring, daemon handlers can be accessed from anywhere
- List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
- ThreadedHandler pullMonitorHandler = null;
- for(ThreadedHandler threadedHandler:daemonHandlers){
- if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
- pullMonitorHandler = threadedHandler;
- }
- }
- // we know this hos is type GsiSSHHostType
- String monitorMode = ((GsisshHostType) host).getMonitorMode();
- if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- }else{
- log.error("Currently we only support Pull monitoring");
- }
- } catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- } catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- throw new GFacProviderException(error, e);
- }
- }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/util/GFACGSISSHUtils.java
deleted file mode 100644
index 0351d00..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/util/GFACGSISSHUtils.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.util;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.RequestData;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-
-public class GFACGSISSHUtils {
- private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
-
- public static final String PBS_JOB_MANAGER = "pbs";
- public static final String SLURM_JOB_MANAGER = "slurm";
- public static final String SUN_GRID_ENGINE_JOB_MANAGER = "sge";
-
- public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
- RequestData requestData = new RequestData("default");
- GSISecurityContext context = null;
- try {
- //todo fix this
- context = new GSISecurityContext(null, requestData);
- } catch (Exception e) {
- throw new GFacException("An error occurred while creating GSI security context", e);
- }
- HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
- if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType
- || registeredHost.getType() instanceof SSHHostType) {
- logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
- } else if (registeredHost.getType() instanceof GsisshHostType) {
- GSIAuthenticationInfo authenticationInfo
- = new MyProxyAuthenticationInfo(requestData.getMyProxyUserName(), requestData.getMyProxyPassword(), requestData.getMyProxyServerUrl(),
- requestData.getMyProxyPort(), requestData.getMyProxyLifeTime(), System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY));
- GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType();
- ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(),
- gsisshHostType.getPort());
-
- Cluster pbsCluster = null;
- try {
- JobManagerConfiguration jConfig = null;
- String installedParentPath = ((HpcApplicationDeploymentType)
- jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
- String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
- if (jobManager == null) {
- logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = CommonUtils.getPBSJobManager(installedParentPath);
- } else {
- if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getPBSJobManager(installedParentPath);
- } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
- } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
- jConfig = CommonUtils.getSGEJobManager(installedParentPath);
- }
- }
- pbsCluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
- } catch (SSHApiException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
- context.setPbsCluster(pbsCluster);
- }
- jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT,context);
- }
- public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext,
- ApplicationDeploymentDescriptionType app, Cluster cluster) {
- JobDescriptor jobDescriptor = new JobDescriptor();
- // this is common for any application descriptor
- jobDescriptor.setInputDirectory(app.getInputDataDirectory());
- jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
- jobDescriptor.setExecutablePath(app.getExecutableLocation());
- jobDescriptor.setStandardOutFile(app.getStandardOutput());
- jobDescriptor.setStandardErrorFile(app.getStandardError());
- Random random = new Random();
- int i = random.nextInt();
- jobDescriptor.setJobName(app.getApplicationName().getStringValue() + String.valueOf(i));
- jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
-
-
- List<String> inputValues = new ArrayList<String>();
- MessageContext input = jobExecutionContext.getInMessageContext();
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString())
- || "FileArray".equals(actualParameter.getType().getType().toString())) {
- String[] values = null;
- if (actualParameter.getType() instanceof URIArrayType) {
- values = ((URIArrayType) actualParameter.getType()).getValueArray();
- } else if (actualParameter.getType() instanceof StringArrayType) {
- values = ((StringArrayType) actualParameter.getType()).getValueArray();
- } else if (actualParameter.getType() instanceof FileArrayType) {
- values = ((FileArrayType) actualParameter.getType()).getValueArray();
- }
- String value = StringUtil.createDelimiteredString(values, " ");
- inputValues.add(value);
- } else {
- String paramValue = MappingFactory.toString(actualParameter);
- inputValues.add(paramValue);
- }
- }
- jobDescriptor.setInputValues(inputValues);
-
- // this part will fill out the hpcApplicationDescriptor
- if (app instanceof HpcApplicationDeploymentType) {
- HpcApplicationDeploymentType applicationDeploymentType
- = (HpcApplicationDeploymentType) app;
- jobDescriptor.setShellName("/bin/bash");
- jobDescriptor.setAllEnvExport(true);
- jobDescriptor.setMailOptions("n");
- jobDescriptor.setNodes(applicationDeploymentType.getNodeCount());
- jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode());
- jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime()));
- jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
- if (applicationDeploymentType.getProjectAccount() != null) {
- if (applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != null) {
- jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber());
- }
- }
- if (applicationDeploymentType.getQueue() != null) {
- if (applicationDeploymentType.getQueue().getQueueName() != null) {
- jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName());
- }
- }
- jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
- TaskDetails taskData = jobExecutionContext.getTaskData();
- if (taskData != null && taskData.isSetTaskScheduling()) {
- ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling();
- if (computionnalResource.getNodeCount() > 0) {
- jobDescriptor.setNodes(computionnalResource.getNodeCount());
- }
- if (computionnalResource.getComputationalProjectAccount() != null) {
- jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
- }
- if (computionnalResource.getQueueName() != null) {
- jobDescriptor.setQueueName(computionnalResource.getQueueName());
- }
- if (computionnalResource.getTotalCPUCount() > 0) {
- jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
- }
- if (computionnalResource.getWallTimeLimit() > 0) {
- jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
- }
- }
-
- }
- return jobDescriptor;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
index be78570..162c2d2 100644
--- a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
@@ -37,7 +37,7 @@ import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.context.ApplicationContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
import org.apache.airavata.gfac.cpi.GFacImpl;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/security/GSISecurityContextTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/security/GSISecurityContextTestWithMyProxyAuth.java b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/security/GSISecurityContextTestWithMyProxyAuth.java
index 6306e3c..86bd4d3 100644
--- a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/security/GSISecurityContextTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/security/GSISecurityContextTestWithMyProxyAuth.java
@@ -29,7 +29,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.CredentialReaderFactory;
import org.apache.airavata.gfac.RequestData;
-import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
import org.apache.log4j.Logger;
import org.ietf.jgss.GSSCredential;
import org.testng.annotations.AfterClass;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HDFSDataMovementHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HDFSDataMovementHandler.java b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HDFSDataMovementHandler.java
new file mode 100644
index 0000000..f54b7a5
--- /dev/null
+++ b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HDFSDataMovementHandler.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.hadoop.handler;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.hadoop.provider.utils.HadoopUtils;
+import org.apache.airavata.gfac.handler.GFacHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class HDFSDataMovementHandler implements GFacHandler {
+ private static final Logger logger = LoggerFactory.getLogger(HDFSDataMovementHandler.class);
+
+ private boolean isWhirrBasedDeployment = false;
+ private File hadoopConfigDir;
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+ if(inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE").equals("WHIRR")){
+ isWhirrBasedDeployment = true;
+ } else {
+ String hadoopConfigDirPath = (String)inMessageContext.getParameter("HADOOP_CONFIG_DIR");
+ File hadoopConfigDir = new File(hadoopConfigDirPath);
+ if (!hadoopConfigDir.exists()){
+ throw new GFacHandlerException("Specified hadoop configuration directory doesn't exist.");
+ } else if (FileUtils.listFiles(hadoopConfigDir, null, null).size() <= 0){
+ throw new GFacHandlerException("Cannot find any hadoop configuration files inside specified directory.");
+ }
+
+ this.hadoopConfigDir = hadoopConfigDir;
+ }
+
+ if(jobExecutionContext.isInPath()){
+ try {
+ handleInPath(jobExecutionContext);
+ } catch (IOException e) {
+ throw new GFacHandlerException("Error while copying input data from local file system to HDFS.",e);
+ }
+ } else {
+ handleOutPath(jobExecutionContext);
+ }
+ }
+
+ private void handleInPath(JobExecutionContext jobExecutionContext) throws GFacHandlerException, IOException {
+ ApplicationDeploymentDescriptionType appDepDesc =
+ jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ HadoopApplicationDeploymentDescriptionType hadoopAppDesc =
+ (HadoopApplicationDeploymentDescriptionType)appDepDesc;
+ if(appDepDesc.isSetInputDataDirectory() && isInputDataDirectoryLocal(appDepDesc)){
+ Configuration hadoopConf = HadoopUtils.createHadoopConfiguration(jobExecutionContext, isWhirrBasedDeployment, hadoopConfigDir);
+ FileSystem hdfs = FileSystem.get(hadoopConf);
+ hdfs.copyFromLocalFile(new Path(appDepDesc.getInputDataDirectory()),
+ new Path(hadoopAppDesc.getHadoopJobConfiguration().getHdfsInputDirectory()));
+ }
+ }
+
+ private boolean isInputDataDirectoryLocal(ApplicationDeploymentDescriptionType appDepDesc){
+ String inputDataDirectoryPath = appDepDesc.getInputDataDirectory();
+ File inputDataDirectory = new File(inputDataDirectoryPath);
+ if(inputDataDirectory.exists() && FileUtils.listFiles(inputDataDirectory, null, null).size() > 0){
+ return true;
+ }
+
+ return false;
+ }
+
+ private void handleOutPath(JobExecutionContext jobExecutionContext){}
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HadoopDeploymentHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HadoopDeploymentHandler.java b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HadoopDeploymentHandler.java
new file mode 100644
index 0000000..24e0e9a
--- /dev/null
+++ b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/handler/HadoopDeploymentHandler.java
@@ -0,0 +1,276 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.hadoop.handler;
+
+import com.google.common.io.Files;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.handler.GFacHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.schemas.gfac.HadoopHostType;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.whirr.Cluster;
+import org.apache.whirr.ClusterController;
+import org.apache.whirr.ClusterControllerFactory;
+import org.apache.whirr.ClusterSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.*;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.whirr.ClusterSpec.Property.*;
+import static org.apache.whirr.ClusterSpec.Property.INSTANCE_TEMPLATES;
+import static org.apache.whirr.ClusterSpec.Property.PRIVATE_KEY_FILE;
+
+/**
+ * This handler takes care of deploying hadoop in cloud(in cloud bursting scenarios) and
+ * deploying hadoop in local cluster. In case of existing hadoop cluster this will ignore
+ * cluster setup just use the hadoop configuration provided by user.
+ */
+public class HadoopDeploymentHandler implements GFacHandler {
+ private static final Logger logger = LoggerFactory.getLogger("hadoop-dep-handler");
+
+ /**
+ * Once invoked this method will deploy Hadoop in a local cluster or cloud based on the
+ * configuration provided. If there is a already deployed hadoop cluster this will skip
+ * deployment.
+ *
+ *
+ * @param jobExecutionContext job execution context containing all the required configurations
+ * and runtime information.
+ * @throws org.apache.airavata.gfac.handler.GFacHandlerException
+ */
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ if(jobExecutionContext.isInPath()){
+ handleInPath(jobExecutionContext);
+ } else {
+ handleOutPath(jobExecutionContext);
+ }
+ }
+
+ private void handleInPath(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ HostDescription hostDescription =
+ jobExecutionContext.getApplicationContext().getHostDescription();
+ if (!isHadoopDeploymentAvailable(hostDescription)) {
+ // Temp directory to keep generated configuration files.
+ File tempDirectory = Files.createTempDir();
+ try {
+ File hadoopSiteXML = launchHadoopCluster(hostDescription, tempDirectory);
+ jobExecutionContext.getInMessageContext().addParameter("HADOOP_SITE_XML", hadoopSiteXML.getAbsolutePath());
+ jobExecutionContext.getInMessageContext().addParameter("HADOOP_DEPLOYMENT_TYPE", "WHIRR");
+ // TODO: Add hadoop-site.xml to job execution context.
+ } catch (IOException e) {
+ throw new GFacHandlerException("IO Error while processing configurations.",e);
+ } catch (ConfigurationException e) {
+ throw new GFacHandlerException("Whirr configuration error.", e);
+ } catch (InterruptedException e) {
+ throw new GFacHandlerException("Hadoop cluster launch interrupted.", e);
+ } catch (TransformerException e) {
+ throw new GFacHandlerException("Error while creating hadoop-site.xml", e);
+ } catch (ParserConfigurationException e) {
+ throw new GFacHandlerException("Error while creating hadoop-site.xml", e);
+ }
+ } else {
+ jobExecutionContext.getInMessageContext().addParameter("HADOOP_DEPLOYMENT_TYPE",
+ "MANUAL");
+ jobExecutionContext.getInMessageContext().addParameter("HADOOP_CONFIG_DIR",
+ ((HadoopHostType)hostDescription.getType()).getHadoopConfigurationDirectory());
+ logger.info("Hadoop configuration is available. Skipping hadoop deployment.");
+ if(logger.isDebugEnabled()){
+ logger.debug("Hadoop configuration directory: " +
+ getHadoopConfigDirectory(hostDescription));
+ }
+ }
+ }
+
+ private void handleOutPath(JobExecutionContext jobExecutionContext){
+ MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+ if(((String)inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE")).equals("WHIRR")){
+ // TODO: Shutdown hadoop cluster.
+ logger.info("Shutdown hadoop cluster.");
+ }
+ }
+
+ private File launchHadoopCluster(HostDescription hostDescription, File workingDirectory)
+ throws IOException, GFacHandlerException, ConfigurationException, InterruptedException, TransformerException, ParserConfigurationException {
+ ClusterSpec hadoopClusterSpec =
+ whirrConfigurationToClusterSpec(hostDescription, workingDirectory);
+ ClusterController hadoopClusterController =
+ createClusterController(hadoopClusterSpec.getServiceName());
+ Cluster hadoopCluster = hadoopClusterController.launchCluster(hadoopClusterSpec);
+
+ logger.info(String.format("Started cluster of %s instances.\n",
+ hadoopCluster.getInstances().size()));
+
+ File siteXML = new File(workingDirectory, "hadoop-site.xml");
+ clusterPropertiesToHadoopSiteXml(hadoopCluster.getConfiguration(), siteXML);
+
+ return siteXML;
+ }
+
+ private ClusterController createClusterController(String serviceName){
+ ClusterControllerFactory factory = new ClusterControllerFactory();
+ ClusterController controller = factory.create(serviceName);
+
+ if(controller == null){
+ logger.warn("Unable to find the service {0}, using default.", serviceName);
+ controller = factory.create(null);
+ }
+
+ return controller;
+ }
+
+ private ClusterSpec whirrConfigurationToClusterSpec(HostDescription hostDescription,
+ File workingDirectory) throws IOException, GFacHandlerException, ConfigurationException {
+ File whirrConfig = getWhirrConfigurationFile(hostDescription, workingDirectory);
+ CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
+ Configuration configuration = new PropertiesConfiguration(whirrConfig);
+ compositeConfiguration.addConfiguration(configuration);
+
+ ClusterSpec hadoopClusterSpec = new ClusterSpec(compositeConfiguration);
+
+ for (ClusterSpec.Property required : EnumSet.of(CLUSTER_NAME, PROVIDER, IDENTITY, CREDENTIAL,
+ INSTANCE_TEMPLATES, PRIVATE_KEY_FILE)) {
+ if (hadoopClusterSpec.getConfiguration().getString(required.getConfigName()) == null) {
+ throw new IllegalArgumentException(String.format("Option '%s' not set.",
+ required.getSimpleName()));
+ }
+ }
+
+ return hadoopClusterSpec;
+ }
+
+ private File getWhirrConfigurationFile(HostDescription hostDescription, File workingDirectory)
+ throws GFacHandlerException, IOException {
+ HadoopHostType hadoopHostDesc = (HadoopHostType)hostDescription;
+ if(hadoopHostDesc.isSetWhirrConfiguration()){
+ HadoopHostType.WhirrConfiguration whirrConfig = hadoopHostDesc.getWhirrConfiguration();
+ if(whirrConfig.isSetConfigurationFile()){
+ File whirrConfigFile = new File(whirrConfig.getConfigurationFile());
+ if(!whirrConfigFile.exists()){
+ throw new GFacHandlerException(
+ "Specified whirr configuration file doesn't exists.");
+ }
+
+ FileUtils.copyFileToDirectory(whirrConfigFile, workingDirectory);
+
+ return new File(workingDirectory, whirrConfigFile.getName());
+ } else if(whirrConfig.isSetConfiguration()){
+ Properties whirrConfigProps =
+ whirrConfigurationsToProperties(whirrConfig.getConfiguration());
+ File whirrConfigFile = new File(workingDirectory, "whirr-hadoop.config");
+ whirrConfigProps.store(
+ new FileOutputStream(whirrConfigFile), null);
+
+ return whirrConfigFile;
+ }
+ }
+
+ throw new GFacHandlerException("Cannot find Whirr configurations. Whirr configuration "
+ + "is required if you don't have already running Hadoop deployment.");
+ }
+
+ private Properties whirrConfigurationsToProperties(
+ HadoopHostType.WhirrConfiguration.Configuration configuration){
+ Properties whirrConfigProps = new Properties();
+
+ for(HadoopHostType.WhirrConfiguration.Configuration.Property property:
+ configuration.getPropertyArray()) {
+ whirrConfigProps.put(property.getName(), property.getValue());
+ }
+
+ return whirrConfigProps;
+ }
+
+ private void clusterPropertiesToHadoopSiteXml(Properties props, File hadoopSiteXml) throws ParserConfigurationException, TransformerException {
+ DocumentBuilderFactory domFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder documentBuilder = domFactory.newDocumentBuilder();
+
+ Document hadoopSiteXmlDoc = documentBuilder.newDocument();
+
+ hadoopSiteXmlDoc.setXmlVersion("1.0");
+ hadoopSiteXmlDoc.setXmlStandalone(true);
+ hadoopSiteXmlDoc.createProcessingInstruction("xml-stylesheet", "type=\"text/xsl\" href=\"configuration.xsl\"");
+
+ Element configEle = hadoopSiteXmlDoc.createElement("configuration");
+
+ hadoopSiteXmlDoc.appendChild(configEle);
+
+ for(Map.Entry<Object, Object> entry : props.entrySet()){
+ addPropertyToConfiguration(entry, configEle, hadoopSiteXmlDoc);
+ }
+
+ saveDomToFile(hadoopSiteXmlDoc, hadoopSiteXml);
+ }
+
+ private void saveDomToFile(Document dom, File destFile) throws TransformerException {
+ Source source = new DOMSource(dom);
+
+ Result result = new StreamResult(destFile);
+
+ Transformer transformer = TransformerFactory.newInstance().newTransformer();
+ transformer.transform(source, result);
+ }
+
+ private void addPropertyToConfiguration(Map.Entry<Object, Object> entry, Element configElement, Document doc){
+ Element property = doc.createElement("property");
+ configElement.appendChild(property);
+
+ Element nameEle = doc.createElement("name");
+ nameEle.setTextContent(entry.getKey().toString());
+ property.appendChild(nameEle);
+
+ Element valueEle = doc.createElement("value");
+ valueEle.setTextContent(entry.getValue().toString());
+ property.appendChild(valueEle);
+ }
+
+ private boolean isHadoopDeploymentAvailable(HostDescription hostDescription) {
+ return ((HadoopHostType) hostDescription.getType()).isSetHadoopConfigurationDirectory();
+ }
+
+ private String getHadoopConfigDirectory(HostDescription hostDescription){
+ return ((HadoopHostType)hostDescription.getType()).getHadoopConfigurationDirectory();
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9bb8c2be/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
new file mode 100644
index 0000000..f0eb06b
--- /dev/null
+++ b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.hadoop.provider.impl;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.hadoop.provider.utils.HadoopUtils;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Executes hadoop job using the cluster configuration provided by handlers in
+ * in-flow.
+ */
+public class HadoopProvider extends AbstractProvider {
+ private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
+
+ private boolean isWhirrBasedDeployment = false;
+ private File hadoopConfigDir;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+ if(inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE").equals("WHIRR")){
+ isWhirrBasedDeployment = true;
+ } else {
+ String hadoopConfigDirPath = (String)inMessageContext.getParameter("HADOOP_CONFIG_DIR");
+ File hadoopConfigDir = new File(hadoopConfigDirPath);
+ if (!hadoopConfigDir.exists()){
+ throw new GFacProviderException("Specified hadoop configuration directory doesn't exist.");
+ } else if (FileUtils.listFiles(hadoopConfigDir, null, null).size() <= 0){
+ throw new GFacProviderException("Cannot find any hadoop configuration files inside specified directory.");
+ }
+
+ this.hadoopConfigDir = hadoopConfigDir;
+ }
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ HadoopApplicationDeploymentDescriptionType hadoopAppDesc =
+ (HadoopApplicationDeploymentDescriptionType)jobExecutionContext
+ .getApplicationContext().getApplicationDeploymentDescription().getType();
+ MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+ HadoopApplicationDeploymentDescriptionType.HadoopJobConfiguration jobConf = hadoopAppDesc.getHadoopJobConfiguration();
+
+ try{
+ // Preparing Hadoop configuration
+ Configuration hadoopConf = HadoopUtils.createHadoopConfiguration(
+ jobExecutionContext, isWhirrBasedDeployment, hadoopConfigDir);
+
+ // Load jar containing map-reduce job implementation
+ ArrayList<URL> mapRedJars = new ArrayList<URL>();
+ mapRedJars.add(new File(jobConf.getJarLocation()).toURL());
+ URLClassLoader childClassLoader = new URLClassLoader(mapRedJars.toArray(new URL[mapRedJars.size()]),
+ this.getClass().getClassLoader());
+
+ Job job = new Job(hadoopConf);
+
+ job.setJobName(jobConf.getJobName());
+
+ job.setOutputKeyClass(Class.forName(jobConf.getOutputKeyClass(), true, childClassLoader));
+ job.setOutputValueClass(Class.forName(jobConf.getOutputValueClass(), true, childClassLoader));
+
+ job.setMapperClass((Class<? extends Mapper>)Class.forName(jobConf.getMapperClass(), true, childClassLoader));
+ job.setCombinerClass((Class<? extends Reducer>) Class.forName(jobConf.getCombinerClass(), true, childClassLoader));
+ job.setReducerClass((Class<? extends Reducer>) Class.forName(jobConf.getCombinerClass(), true, childClassLoader));
+
+ job.setInputFormatClass((Class<? extends InputFormat>)Class.forName(jobConf.getInputFormatClass(), true, childClassLoader));
+ job.setOutputFormatClass((Class<? extends OutputFormat>) Class.forName(jobConf.getOutputFormatClass(), true, childClassLoader));
+
+ FileInputFormat.setInputPaths(job, new Path(hadoopAppDesc.getInputDataDirectory()));
+ FileOutputFormat.setOutputPath(job, new Path(hadoopAppDesc.getOutputDataDirectory()));
+
+ job.waitForCompletion(true);
+ System.out.println(job.getTrackingURL());
+ if(jobExecutionContext.getOutMessageContext() == null){
+ jobExecutionContext.setOutMessageContext(new MessageContext());
+ }
+
+ OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext().
+ getServiceDescription().getType().getOutputParametersArray();
+ for(OutputParameterType outparamType : outputParametersArray){
+ String paramName = outparamType.getParameterName();
+ if(paramName.equals("test-hadoop")){
+ ActualParameter outParam = new ActualParameter();
+ outParam.getType().changeType(StringParameterType.type);
+ ((StringParameterType) outParam.getType()).setValue(job.getTrackingURL());
+ jobExecutionContext.getOutMessageContext().addParameter("test-hadoop", outParam);
+ }
+ }
+ } catch (Exception e) {
+ String errMessage = "Error occurred during Map-Reduce job execution.";
+ logger.error(errMessage, e);
+ throw new GFacProviderException(errMessage, e);
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ // TODO: How to handle cluster shutdown. Best way is to introduce inPath/outPath to handler.
+ }
+
+ @Override
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+}