You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/24 16:15:05 UTC
[4/5] airavata git commit: fixing compilation issues
http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
index 884ccd5..21991fd 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
@@ -1,92 +1,92 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.handler;
-
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-
-public class LocalInputHandler extends AbstractHandler {
- private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class);
- @Override
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
- for (Map.Entry<String, Object> inputParamEntry : inputParameters.entrySet()) {
- if (inputParamEntry.getValue() instanceof InputDataObjectType) {
- InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue();
- if (inputDataObject.getType() == DataType.URI
- && inputDataObject != null
- && !inputDataObject.getValue().equals("")) {
- try {
- inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue()));
- } catch (IOException e) {
- throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue());
- }
- }
- }
- }
- }
-
- private String stageFile(String inputDir, String sourceFilePath) throws IOException {
- int i = sourceFilePath.lastIndexOf(File.separator);
- String substring = sourceFilePath.substring(i + 1);
- if (inputDir.endsWith("/")) {
- inputDir = inputDir.substring(0, inputDir.length() - 1);
- }
- String targetFilePath = inputDir + File.separator + substring;
-
- if (sourceFilePath.startsWith("file")) {
- sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length());
- }
-
- File sourceFile = new File(sourceFilePath);
- File targetFile = new File(targetFilePath);
- if (targetFile.exists()) {
- targetFile.delete();
- }
- logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath);
- FileUtils.copyFile(sourceFile, targetFile);
-
- return targetFilePath;
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-
- }
-
- @Override
- public void initProperties(Properties properties) throws GFacHandlerException {
-
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.local.handler;
+//
+//import org.apache.airavata.gfac.core.context.JobExecutionContext;
+//import org.apache.airavata.gfac.core.handler.AbstractHandler;
+//import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+//import org.apache.airavata.model.appcatalog.appinterface.DataType;
+//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+//import org.apache.commons.io.FileUtils;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.File;
+//import java.io.IOException;
+//import java.util.Map;
+//import java.util.Properties;
+//
+//
+//public class LocalInputHandler extends AbstractHandler {
+// private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class);
+// @Override
+// public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+// super.invoke(jobExecutionContext);
+// Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+// for (Map.Entry<String, Object> inputParamEntry : inputParameters.entrySet()) {
+// if (inputParamEntry.getValue() instanceof InputDataObjectType) {
+// InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue();
+// if (inputDataObject.getType() == DataType.URI
+// && inputDataObject != null
+// && !inputDataObject.getValue().equals("")) {
+// try {
+// inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue()));
+// } catch (IOException e) {
+// throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue());
+// }
+// }
+// }
+// }
+// }
+//
+// private String stageFile(String inputDir, String sourceFilePath) throws IOException {
+// int i = sourceFilePath.lastIndexOf(File.separator);
+// String substring = sourceFilePath.substring(i + 1);
+// if (inputDir.endsWith("/")) {
+// inputDir = inputDir.substring(0, inputDir.length() - 1);
+// }
+// String targetFilePath = inputDir + File.separator + substring;
+//
+// if (sourceFilePath.startsWith("file")) {
+// sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length());
+// }
+//
+// File sourceFile = new File(sourceFilePath);
+// File targetFile = new File(targetFilePath);
+// if (targetFile.exists()) {
+// targetFile.delete();
+// }
+// logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath);
+// FileUtils.copyFile(sourceFile, targetFile);
+//
+// return targetFilePath;
+// }
+//
+// @Override
+// public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+//
+// }
+//
+// @Override
+// public void initProperties(Properties properties) throws GFacHandlerException {
+//
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 5bd75e5..2ea6518 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -1,309 +1,309 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.provider.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.airavata.gfac.core.GFacConstants;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.provider.AbstractProvider;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.impl.OutputUtils;
-import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
-import org.apache.airavata.gfac.local.utils.InputUtils;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
-import org.apache.airavata.model.experiment.JobDetails;
-import org.apache.airavata.model.experiment.JobState;
-import org.apache.airavata.model.experiment.TaskDetails;
-import org.apache.airavata.registry.cpi.ExpCatChildDataType;
-import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-public class LocalProvider extends AbstractProvider {
- private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
- private ProcessBuilder builder;
- private List<String> cmdList;
- private String jobId;
-
- public static class LocalProviderJobData{
- private String applicationName;
- private List<String> inputParameters;
- private String workingDir;
- private String inputDir;
- private String outputDir;
- public String getApplicationName() {
- return applicationName;
- }
- public void setApplicationName(String applicationName) {
- this.applicationName = applicationName;
- }
- public List<String> getInputParameters() {
- return inputParameters;
- }
- public void setInputParameters(List<String> inputParameters) {
- this.inputParameters = inputParameters;
- }
- public String getWorkingDir() {
- return workingDir;
- }
- public void setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- }
- public String getInputDir() {
- return inputDir;
- }
- public void setInputDir(String inputDir) {
- this.inputDir = inputDir;
- }
- public String getOutputDir() {
- return outputDir;
- }
- public void setOutputDir(String outputDir) {
- this.outputDir = outputDir;
- }
- }
- public LocalProvider(){
- cmdList = new ArrayList<String>();
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- super.initialize(jobExecutionContext);
-
- // build command with all inputs
- buildCommand();
- initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
-
- // extra environment variables
- builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
- builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
-
- // set working directory
- builder.directory(new File(jobExecutionContext.getWorkingDir()));
-
- // log info
- log.info("Command = " + InputUtils.buildCommand(cmdList));
- log.info("Working dir = " + builder.directory());
- /*for (String key : builder.environment().keySet()) {
- log.info("Env[" + key + "] = " + builder.environment().get(key));
- }*/
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- JobDetails jobDetails = new JobDetails();
- try {
- jobId = jobExecutionContext.getTaskData().getTaskID();
- jobDetails.setJobID(jobId);
- jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getAppDeploymentDescription());
- jobExecutionContext.setJobDetails(jobDetails);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP);
- // running cmd
- Process process = builder.start();
-
- Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput());
- Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError());
-
- // start output threads
- standardOutWriter.setDaemon(true);
- standardErrorWriter.setDaemon(true);
- standardOutWriter.start();
- standardErrorWriter.start();
-
- int returnValue = process.waitFor();
-
- // make sure other two threads are done
- standardOutWriter.join();
- standardErrorWriter.join();
-
- /*
- * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
- * just provide warning in the log messages
- */
- if (returnValue != 0) {
- log.error("Process finished with non zero return value. Process may have failed");
- } else {
- log.info("Process finished with return value of zero.");
- }
-
- StringBuffer buf = new StringBuffer();
- buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
- .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir())
- .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ")
- .append(String.valueOf(returnValue));
-
- log.info(buf.toString());
-
- // updating the job status to complete because there's nothing to monitor in local jobs
-// MonitorID monitorID = createMonitorID(jobExecutionContext);
- JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
- jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (InterruptedException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }catch (GFacException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }
- }
-
-// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
-// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
-// jobExecutionContext.getTaskData().getTaskID(),
-// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
-// jobExecutionContext.getExperiment().getUserName(),jobId);
-// return monitorID;
-// }
-
-// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
-// throws GFacProviderException {
-// ApplicationDeploymentDescriptionType app = jobExecutionContext.
-// getApplicationContext().getApplicationDeploymentDescription().getType();
-// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
-// appJob.setJobId(jobId);
-// LocalProviderJobData data = new LocalProviderJobData();
-// data.setApplicationName(app.getExecutableLocation());
-// data.setInputDir(app.getInputDataDirectory());
-// data.setOutputDir(app.getOutputDataDirectory());
-// data.setWorkingDir(builder.directory().toString());
-// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
-// ByteArrayOutputStream stream = new ByteArrayOutputStream();
-// JAXB.marshal(data, stream);
-// appJob.setJobData(stream.toString());
-// appJob.setSubmittedTime(Calendar.getInstance().getTime());
-// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
-// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
-// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
-// }
-
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- try {
- List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
- String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput());
- String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError());
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
- TaskDetails taskDetails = (TaskDetails) experimentCatalog.get(ExperimentCatalogModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
- if (taskDetails != null){
- taskDetails.setApplicationOutputs(outputArray);
- experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
- }
- experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
- } catch (XmlException e) {
- throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
- } catch (IOException io) {
- throw new GFacProviderException(io.getMessage(), io);
- } catch (Exception e){
- throw new GFacProviderException("Error in retrieving results",e);
- }
- }
-
- public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
- throw new NotImplementedException();
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- // TODO: Auto generated method body.
- }
-
- @Override
- public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- // TODO: Auto generated method body.
- }
-
-
- private void buildCommand() {
- cmdList.add(jobExecutionContext.getExecutablePath());
- Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
-
- // sort the inputs first and then build the command List
- Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
- @Override
- public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
- return inputDataObjectType.getInputOrder() - t1.getInputOrder();
- }
- };
- Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
- for (Object object : inputParameters.values()) {
- if (object instanceof InputDataObjectType) {
- InputDataObjectType inputDOT = (InputDataObjectType) object;
- sortedInputSet.add(inputDOT);
- }
- }
- for (InputDataObjectType inputDataObjectType : sortedInputSet) {
- if (inputDataObjectType.getApplicationArgument() != null
- && !inputDataObjectType.getApplicationArgument().equals("")) {
- cmdList.add(inputDataObjectType.getApplicationArgument());
- }
-
- if (inputDataObjectType.getValue() != null
- && !inputDataObjectType.getValue().equals("")) {
- cmdList.add(inputDataObjectType.getValue());
- }
- }
-
- }
-
- private void initProcessBuilder(ApplicationDeploymentDescription app){
- builder = new ProcessBuilder(cmdList);
-
- List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
- if (setEnvironment != null) {
- for (SetEnvPaths envPath : setEnvironment) {
- Map<String,String> builderEnv = builder.environment();
- builderEnv.put(envPath.getName(), envPath.getValue());
- }
- }
- }
-
- public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.local.provider.impl;
+//
+//import java.io.File;
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.Comparator;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Set;
+//import java.util.TreeSet;
+//
+//import org.apache.airavata.gfac.core.GFacConstants;
+//import org.apache.airavata.gfac.core.GFacException;
+//import org.apache.airavata.gfac.core.context.JobExecutionContext;
+//import org.apache.airavata.gfac.core.provider.AbstractProvider;
+//import org.apache.airavata.gfac.core.provider.GFacProviderException;
+//import org.apache.airavata.gfac.core.GFacUtils;
+//import org.apache.airavata.gfac.impl.OutputUtils;
+//import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+//import org.apache.airavata.gfac.local.utils.InputUtils;
+//import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+//import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+//import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+//import org.apache.airavata.model.messaging.event.JobIdentifier;
+//import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+//import org.apache.airavata.model.messaging.event.TaskIdentifier;
+//import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+//import org.apache.airavata.model.experiment.JobDetails;
+//import org.apache.airavata.model.experiment.JobState;
+//import org.apache.airavata.model.experiment.TaskDetails;
+//import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+//import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+//import org.apache.xmlbeans.XmlException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+//
+//public class LocalProvider extends AbstractProvider {
+// private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+// private ProcessBuilder builder;
+// private List<String> cmdList;
+// private String jobId;
+//
+// public static class LocalProviderJobData{
+// private String applicationName;
+// private List<String> inputParameters;
+// private String workingDir;
+// private String inputDir;
+// private String outputDir;
+// public String getApplicationName() {
+// return applicationName;
+// }
+// public void setApplicationName(String applicationName) {
+// this.applicationName = applicationName;
+// }
+// public List<String> getInputParameters() {
+// return inputParameters;
+// }
+// public void setInputParameters(List<String> inputParameters) {
+// this.inputParameters = inputParameters;
+// }
+// public String getWorkingDir() {
+// return workingDir;
+// }
+// public void setWorkingDir(String workingDir) {
+// this.workingDir = workingDir;
+// }
+// public String getInputDir() {
+// return inputDir;
+// }
+// public void setInputDir(String inputDir) {
+// this.inputDir = inputDir;
+// }
+// public String getOutputDir() {
+// return outputDir;
+// }
+// public void setOutputDir(String outputDir) {
+// this.outputDir = outputDir;
+// }
+// }
+// public LocalProvider(){
+// cmdList = new ArrayList<String>();
+// }
+//
+// public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+// super.initialize(jobExecutionContext);
+//
+// // build command with all inputs
+// buildCommand();
+// initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
+//
+// // extra environment variables
+// builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
+// builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
+//
+// // set working directory
+// builder.directory(new File(jobExecutionContext.getWorkingDir()));
+//
+// // log info
+// log.info("Command = " + InputUtils.buildCommand(cmdList));
+// log.info("Working dir = " + builder.directory());
+// /*for (String key : builder.environment().keySet()) {
+// log.info("Env[" + key + "] = " + builder.environment().get(key));
+// }*/
+// }
+//
+// public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+// JobDetails jobDetails = new JobDetails();
+// try {
+// jobId = jobExecutionContext.getTaskData().getTaskID();
+// jobDetails.setJobID(jobId);
+// jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
+// .getApplicationDeploymentDescription().getAppDeploymentDescription());
+// jobExecutionContext.setJobDetails(jobDetails);
+// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP);
+// // running cmd
+// Process process = builder.start();
+//
+// Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput());
+// Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError());
+//
+// // start output threads
+// standardOutWriter.setDaemon(true);
+// standardErrorWriter.setDaemon(true);
+// standardOutWriter.start();
+// standardErrorWriter.start();
+//
+// int returnValue = process.waitFor();
+//
+// // make sure other two threads are done
+// standardOutWriter.join();
+// standardErrorWriter.join();
+//
+// /*
+// * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+// * just provide warning in the log messages
+// */
+// if (returnValue != 0) {
+// log.error("Process finished with non zero return value. Process may have failed");
+// } else {
+// log.info("Process finished with return value of zero.");
+// }
+//
+// StringBuffer buf = new StringBuffer();
+// buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+// .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir())
+// .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ")
+// .append(String.valueOf(returnValue));
+//
+// log.info(buf.toString());
+//
+// // updating the job status to complete because there's nothing to monitor in local jobs
+//// MonitorID monitorID = createMonitorID(jobExecutionContext);
+// JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+// jobExecutionContext.getTaskData().getTaskID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getGatewayID());
+// jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+// } catch (IOException io) {
+// throw new GFacProviderException(io.getMessage(), io);
+// } catch (InterruptedException e) {
+// throw new GFacProviderException(e.getMessage(), e);
+// }catch (GFacException e) {
+// throw new GFacProviderException(e.getMessage(), e);
+// }
+// }
+//
+//// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
+//// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
+//// jobExecutionContext.getTaskData().getTaskID(),
+//// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+//// jobExecutionContext.getExperiment().getUserName(),jobId);
+//// return monitorID;
+//// }
+//
+//// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+//// throws GFacProviderException {
+//// ApplicationDeploymentDescriptionType app = jobExecutionContext.
+//// getApplicationContext().getApplicationDeploymentDescription().getType();
+//// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+//// appJob.setJobId(jobId);
+//// LocalProviderJobData data = new LocalProviderJobData();
+//// data.setApplicationName(app.getExecutableLocation());
+//// data.setInputDir(app.getInputDataDirectory());
+//// data.setOutputDir(app.getOutputDataDirectory());
+//// data.setWorkingDir(builder.directory().toString());
+//// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+//// ByteArrayOutputStream stream = new ByteArrayOutputStream();
+//// JAXB.marshal(data, stream);
+//// appJob.setJobData(stream.toString());
+//// appJob.setSubmittedTime(Calendar.getInstance().getTime());
+//// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+//// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+//// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+//// }
+//
+// public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+// try {
+// List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+// String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput());
+// String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError());
+// Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+// OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+// TaskDetails taskDetails = (TaskDetails) experimentCatalog.get(ExperimentCatalogModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+// if (taskDetails != null){
+// taskDetails.setApplicationOutputs(outputArray);
+// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
+// }
+// experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+// TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getGatewayID());
+// jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+// } catch (XmlException e) {
+// throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+// } catch (IOException io) {
+// throw new GFacProviderException(io.getMessage(), io);
+// } catch (Exception e){
+// throw new GFacProviderException("Error in retrieving results",e);
+// }
+// }
+//
+// public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+// throw new NotImplementedException();
+// }
+//
+// @Override
+// public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+// // TODO: Auto generated method body.
+// }
+//
+// @Override
+// public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+// // TODO: Auto generated method body.
+// }
+//
+//
+// private void buildCommand() {
+// cmdList.add(jobExecutionContext.getExecutablePath());
+// Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+//
+// // sort the inputs first and then build the command List
+// Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+// @Override
+// public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+// return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+// }
+// };
+// Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+// for (Object object : inputParameters.values()) {
+// if (object instanceof InputDataObjectType) {
+// InputDataObjectType inputDOT = (InputDataObjectType) object;
+// sortedInputSet.add(inputDOT);
+// }
+// }
+// for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+// if (inputDataObjectType.getApplicationArgument() != null
+// && !inputDataObjectType.getApplicationArgument().equals("")) {
+// cmdList.add(inputDataObjectType.getApplicationArgument());
+// }
+//
+// if (inputDataObjectType.getValue() != null
+// && !inputDataObjectType.getValue().equals("")) {
+// cmdList.add(inputDataObjectType.getValue());
+// }
+// }
+//
+// }
+//
+// private void initProcessBuilder(ApplicationDeploymentDescription app){
+// builder = new ProcessBuilder(cmdList);
+//
+// List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+// if (setEnvironment != null) {
+// for (SetEnvPaths envPath : setEnvironment) {
+// Map<String,String> builderEnv = builder.environment();
+// builderEnv.put(envPath.getName(), envPath.getValue());
+// }
+// }
+// }
+//
+// public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+//
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
index 2b45df7..8d7cd8d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
@@ -1,51 +1,51 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.utils;
-
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class LocalProviderUtil {
- private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class);
-
- private void makeFileSystemDir(String dir) throws GFacProviderException {
- File f = new File(dir);
- if (f.isDirectory() && f.exists()) {
- return;
- } else if (!new File(dir).mkdir()) {
- throw new GFacProviderException("Cannot make directory " + dir);
- }
- }
-
- public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- log.info("working diectroy = " + jobExecutionContext.getWorkingDir());
- log.info("temp directory = " + jobExecutionContext.getScratchLocation());
- makeFileSystemDir(jobExecutionContext.getWorkingDir());
- makeFileSystemDir(jobExecutionContext.getScratchLocation());
- makeFileSystemDir(jobExecutionContext.getInputDir());
- makeFileSystemDir(jobExecutionContext.getOutputDir());
- }
-
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.local.utils;
+//
+//import org.apache.airavata.gfac.core.context.JobExecutionContext;
+//import org.apache.airavata.gfac.core.provider.GFacProviderException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.File;
+//
+//public class LocalProviderUtil {
+// private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class);
+//
+// private void makeFileSystemDir(String dir) throws GFacProviderException {
+// File f = new File(dir);
+// if (f.isDirectory() && f.exists()) {
+// return;
+// } else if (!new File(dir).mkdir()) {
+// throw new GFacProviderException("Cannot make directory " + dir);
+// }
+// }
+//
+// public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+// log.info("working diectroy = " + jobExecutionContext.getWorkingDir());
+// log.info("temp directory = " + jobExecutionContext.getScratchLocation());
+// makeFileSystemDir(jobExecutionContext.getWorkingDir());
+// makeFileSystemDir(jobExecutionContext.getScratchLocation());
+// makeFileSystemDir(jobExecutionContext.getInputDir());
+// makeFileSystemDir(jobExecutionContext.getOutputDir());
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 68fb39c..587bf46 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -1,229 +1,229 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.ssh.handler;
-
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
-import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.experiment.*;
-import org.apache.airavata.registry.cpi.ExpCatChildDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.*;
-
-/**
- * This handler will copy input data from gateway machine to airavata
- * installed machine, later running handlers can copy the input files to computing resource
- * <Handler class="AdvancedSCPOutputHandler">
- * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
- * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
- * <property name="userName" value="airavata"/>
- * <property name="hostName" value="gw98.iu.xsede.org"/>
- * <property name="inputPath" value="/home/airavata/outputData"/>
- */
-public class AdvancedSCPInputHandler extends AbstractHandler {
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
- public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
- public static final int DEFAULT_SSH_PORT = 22;
-
- private String password = null;
-
- private String publicKeyPath;
-
- private String passPhrase;
-
- private String privateKeyPath;
-
- private String userName;
-
- private String hostName;
-
- private String inputPath;
-
- public void initProperties(Properties properties) throws GFacHandlerException {
- password = (String) properties.get("password");
- passPhrase = (String) properties.get("passPhrase");
- privateKeyPath = (String) properties.get("privateKeyPath");
- publicKeyPath = (String) properties.get("publicKeyPath");
- userName = (String) properties.get("userName");
- hostName = (String) properties.get("hostName");
- inputPath = (String) properties.get("inputPath");
- }
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- int index = 0;
- int oldIndex = 0;
- List<String> oldFiles = new ArrayList<String>();
- MessageContext inputNew = new MessageContext();
- StringBuffer data = new StringBuffer("|");
- RemoteCluster remoteCluster = null;
-
- try {
- String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
- if (pluginData != null) {
- try {
- oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
- oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
- if (oldIndex == oldFiles.size()) {
- log.info("Old data looks good !!!!");
- } else {
- oldIndex = 0;
- oldFiles.clear();
- }
- } catch (NumberFormatException e) {
- log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
- }
- }
-
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
-
- // Server info
- String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
- if (index < oldIndex) {
- parentPath = oldFiles.get(index);
- data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
- } else {
- (new File(parentPath)).mkdirs();
- StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- DataTransferDetails detail = new DataTransferDetails();
- TransferStatus status = new TransferStatus();
- // here doesn't matter what the job manager is because we are only doing some file handling
- // not really dealing with monitoring or job submission, so we pa
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
- String paramValue = inputParamType.getValue();
- // TODO: Review this with type
- if (inputParamType.getType() == DataType.URI) {
- try {
- URL file = new URL(paramValue);
- String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT;
- GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
- remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster();
- paramValue = file.getPath();
- } catch (MalformedURLException e) {
- String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
- GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
- remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster();
- log.error(e.getLocalizedMessage(), e);
- }
-
- if (index < oldIndex) {
- log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- inputParamType.setValue(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
- } else {
- String stageInputFile = stageInputFiles(remoteCluster, paramValue, parentPath);
- inputParamType.setValue(stageInputFile);
- StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFile);
- experimentCatalog.add(ExpCatChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- }
- // FIXME: what is the thrift model DataType equivalent for URIArray type?
-// else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-// List<String> newFiles = new ArrayList<String>();
-// for (String paramValueEach : split) {
-// try {
-// URL file = new URL(paramValue);
-// this.userName = file.getUserInfo();
-// this.hostName = file.getHost();
-// paramValueEach = file.getPath();
-// } catch (MalformedURLException e) {
-// log.error(e.getLocalizedMessage(), e);
-// }
-// if (index < oldIndex) {
-// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-// newFiles.add(oldFiles.get(index));
-// data.append(oldFiles.get(index++)).append(",");
-// } else {
-// String stageInputFiles = stageInputFiles(remoteCluster, paramValueEach, parentPath);
-// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-// newFiles.add(stageInputFiles);
-// }
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.ssh.handler;
+//
+//import org.apache.airavata.gfac.core.GFacException;
+//import org.apache.airavata.gfac.core.SSHApiException;
+//import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+//import org.apache.airavata.gfac.core.context.JobExecutionContext;
+//import org.apache.airavata.gfac.core.context.MessageContext;
+//import org.apache.airavata.gfac.core.handler.AbstractHandler;
+//import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+//import org.apache.airavata.gfac.core.GFacUtils;
+//import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+//import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+//import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+//import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+//import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+//import org.apache.airavata.model.appcatalog.appinterface.DataType;
+//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+//import org.apache.airavata.model.experiment.*;
+//import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.File;
+//import java.io.PrintWriter;
+//import java.io.StringWriter;
+//import java.net.MalformedURLException;
+//import java.net.URL;
+//import java.util.*;
+//
+///**
+// * This handler will copy input data from gateway machine to airavata
+// * installed machine, later running handlers can copy the input files to computing resource
+// * <Handler class="AdvancedSCPOutputHandler">
+// * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+// * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+// * <property name="userName" value="airavata"/>
+// * <property name="hostName" value="gw98.iu.xsede.org"/>
+// * <property name="inputPath" value="/home/airavata/outputData"/>
+// */
+//public class AdvancedSCPInputHandler extends AbstractHandler {
+// private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+// public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
+// public static final int DEFAULT_SSH_PORT = 22;
+//
+// private String password = null;
+//
+// private String publicKeyPath;
+//
+// private String passPhrase;
+//
+// private String privateKeyPath;
+//
+// private String userName;
+//
+// private String hostName;
+//
+// private String inputPath;
+//
+// public void initProperties(Properties properties) throws GFacHandlerException {
+// password = (String) properties.get("password");
+// passPhrase = (String) properties.get("passPhrase");
+// privateKeyPath = (String) properties.get("privateKeyPath");
+// publicKeyPath = (String) properties.get("publicKeyPath");
+// userName = (String) properties.get("userName");
+// hostName = (String) properties.get("hostName");
+// inputPath = (String) properties.get("inputPath");
+// }
+//
+// public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+// super.invoke(jobExecutionContext);
+// int index = 0;
+// int oldIndex = 0;
+// List<String> oldFiles = new ArrayList<String>();
+// MessageContext inputNew = new MessageContext();
+// StringBuffer data = new StringBuffer("|");
+// RemoteCluster remoteCluster = null;
+//
+// try {
+// String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
+// if (pluginData != null) {
+// try {
+// oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+// oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+// if (oldIndex == oldFiles.size()) {
+// log.info("Old data looks good !!!!");
+// } else {
+// oldIndex = 0;
+// oldFiles.clear();
// }
-// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+// } catch (NumberFormatException e) {
+// log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
// }
- inputNew.getParameters().put(paramName, inputParamType);
- }
- } catch (Exception e) {
- log.error(e.getMessage());
- try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
- }
- throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- }
-
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- this.invoke(jobExecutionContext);
- }
-
- private String stageInputFiles(RemoteCluster remoteCluster, String paramValue, String parentPath) throws GFacException {
- try {
- remoteCluster.scpFrom(paramValue, parentPath);
- return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
- } catch (SSHApiException e) {
- log.error("Error tranfering remote file to local file, remote path: " + paramValue);
- throw new GFacException(e);
- }
- }
-}
+// }
+//
+// AuthenticationInfo authenticationInfo = null;
+// if (password != null) {
+// authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+// } else {
+// authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+// this.passPhrase);
+// }
+//
+// // Server info
+// String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
+// if (index < oldIndex) {
+// parentPath = oldFiles.get(index);
+// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+// } else {
+// (new File(parentPath)).mkdirs();
+// StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
+// GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+// }
+// DataTransferDetails detail = new DataTransferDetails();
+// TransferStatus status = new TransferStatus();
+// // here doesn't matter what the job manager is because we are only doing some file handling
+// // not really dealing with monitoring or job submission, so we pa
+//
+// MessageContext input = jobExecutionContext.getInMessageContext();
+// Set<String> parameters = input.getParameters().keySet();
+// for (String paramName : parameters) {
+// InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
+// String paramValue = inputParamType.getValue();
+// // TODO: Review this with type
+// if (inputParamType.getType() == DataType.URI) {
+// try {
+// URL file = new URL(paramValue);
+// String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT;
+// GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
+// remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster();
+// paramValue = file.getPath();
+// } catch (MalformedURLException e) {
+// String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
+// GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
+// remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster();
+// log.error(e.getLocalizedMessage(), e);
+// }
+//
+// if (index < oldIndex) {
+// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+// inputParamType.setValue(oldFiles.get(index));
+// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+// } else {
+// String stageInputFile = stageInputFiles(remoteCluster, paramValue, parentPath);
+// inputParamType.setValue(stageInputFile);
+// StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+// status.setTransferState(TransferState.UPLOAD);
+// detail.setTransferStatus(status);
+// detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+// experimentCatalog.add(ExpCatChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+//
+// GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+// }
+// }
+// // FIXME: what is the thrift model DataType equivalent for URIArray type?
+//// else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+//// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+//// List<String> newFiles = new ArrayList<String>();
+//// for (String paramValueEach : split) {
+//// try {
+//// URL file = new URL(paramValue);
+//// this.userName = file.getUserInfo();
+//// this.hostName = file.getHost();
+//// paramValueEach = file.getPath();
+//// } catch (MalformedURLException e) {
+//// log.error(e.getLocalizedMessage(), e);
+//// }
+//// if (index < oldIndex) {
+//// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+//// newFiles.add(oldFiles.get(index));
+//// data.append(oldFiles.get(index++)).append(",");
+//// } else {
+//// String stageInputFiles = stageInputFiles(remoteCluster, paramValueEach, parentPath);
+//// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+//// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+//// newFiles.add(stageInputFiles);
+//// }
+//// }
+//// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+//// }
+// inputNew.getParameters().put(paramName, inputParamType);
+// }
+// } catch (Exception e) {
+// log.error(e.getMessage());
+// try {
+// StringWriter errors = new StringWriter();
+// e.printStackTrace(new PrintWriter(errors));
+// GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+// } catch (GFacException e1) {
+// log.error(e1.getLocalizedMessage());
+// }
+// throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+// }
+// jobExecutionContext.setInMessageContext(inputNew);
+// }
+//
+// public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+// this.invoke(jobExecutionContext);
+// }
+//
+// private String stageInputFiles(RemoteCluster remoteCluster, String paramValue, String parentPath) throws GFacException {
+// try {
+// remoteCluster.scpFrom(paramValue, parentPath);
+// return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
+// } catch (SSHApiException e) {
+// log.error("Error tranfering remote file to local file, remote path: " + paramValue);
+// throw new GFacException(e);
+// }
+// }
+//}