You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/02/26 21:55:36 UTC

svn commit: r1450409 - in /airavata/trunk/modules/gfac-core: ./ src/main/java/org/apache/airavata/gfac/handler/ src/main/java/org/apache/airavata/gfac/provider/impl/ src/main/java/org/apache/airavata/gfac/provider/utils/

Author: lahiru
Date: Tue Feb 26 20:55:36 2013
New Revision: 1450409

URL: http://svn.apache.org/r1450409
Log:
Addding cloud bursting code to trunk.

Added:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HDFSDataMovementHandler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HadoopDeploymentHandler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/HadoopUtils.java
Modified:
    airavata/trunk/modules/gfac-core/pom.xml

Modified: airavata/trunk/modules/gfac-core/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/pom.xml?rev=1450409&r1=1450408&r2=1450409&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/pom.xml (original)
+++ airavata/trunk/modules/gfac-core/pom.xml Tue Feb 26 20:55:36 2013
@@ -8,8 +8,7 @@
     ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
     the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.airavata</groupId>
         <artifactId>airavata</artifactId>
@@ -25,14 +24,27 @@
 
     <dependencies>
         <dependency>
+            <groupId>cog-globus</groupId>
+            <artifactId>cog-jglobus</artifactId>
+            <version>1.8.0-bc146</version>
+        </dependency>
+        <!-- Experimental dependencies for XSP integration
+        <dependency>
             <groupId>cog-jglobus</groupId>
             <artifactId>cog-jglobus</artifactId>
-            <version>1.8.0_1</version>
+            <version>1.8.0-mod</version>
+        </dependency>
+        <dependency>
+            <groupId>edu.iub.soic.damsl</groupId>
+            <artifactId>jXSP</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
         </dependency>
+        -->
         <dependency>
             <groupId>cog-globus</groupId>
             <artifactId>puretls</artifactId>
             <type>jar</type>
+            <!--  <version>0.9b4-1</version> -->
             <version>1.0</version>
         </dependency>
         <dependency>
@@ -45,12 +57,6 @@
             <artifactId>cryptix-asn1</artifactId>
             <version>1.0</version>
         </dependency>
-        <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
-            <version>1.6</version>
-        </dependency>
-
         <!-- Logging -->
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -58,6 +64,11 @@
         </dependency>
 
         <!-- GFAC schemas -->
+        <!--dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-gfac-schema-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency-->
         <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-client-api</artifactId>
@@ -84,11 +95,11 @@
         </dependency>
 
         <!-- Amazon -->
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk</artifactId>
-            <version>1.3.20</version>
-        </dependency>
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>aws-java-sdk</artifactId>
+			<version>1.3.20</version>
+		</dependency>
 
         <!-- Test -->
         <dependency>
@@ -96,16 +107,16 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
-        </dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>jcl-over-slf4j</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>test</scope>
+		</dependency>
         <dependency>
             <groupId>j2ssh</groupId>
             <artifactId>j2ssh-core</artifactId>
@@ -123,19 +134,69 @@
             <groupId>eu.unicore</groupId>
             <artifactId>ogsabes-client</artifactId>
             <version>1.6.0-SNAPSHOT</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
+	<exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+         <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
         </dependency>
+
+        <!-- unicore dependencies finished -->
+
         <!-- Guava -->
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>12.0</version>
         </dependency>
+
+       <!-- Hadoop provider related dependencies -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>1.0.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>1.0.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.whirr</groupId>
+            <artifactId>whirr-core</artifactId>
+            <version>0.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.whirr</groupId>
+            <artifactId>whirr-hadoop</artifactId>
+            <version>0.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.8.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.44-1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>1.7</version>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.jopt-simple</groupId>
+            <artifactId>jopt-simple</artifactId>
+            <version>3.2</version>
+        </dependency>
     </dependencies>
 
 </project>

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HDFSDataMovementHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HDFSDataMovementHandler.java?rev=1450409&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HDFSDataMovementHandler.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HDFSDataMovementHandler.java Tue Feb 26 20:55:36 2013
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.handler.GFacHandler;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.provider.utils.HadoopUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class HDFSDataMovementHandler implements GFacHandler {
+    private static final Logger logger = LoggerFactory.getLogger(HDFSDataMovementHandler.class);
+
+    private boolean isWhirrBasedDeployment = false;
+    private File hadoopConfigDir;
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+        if(inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE").equals("WHIRR")){
+            isWhirrBasedDeployment = true;
+        } else {
+            String hadoopConfigDirPath = (String)inMessageContext.getParameter("HADOOP_CONFIG_DIR");
+            File hadoopConfigDir = new File(hadoopConfigDirPath);
+            if (!hadoopConfigDir.exists()){
+                throw new GFacHandlerException("Specified hadoop configuration directory doesn't exist.");
+            } else if (FileUtils.listFiles(hadoopConfigDir, null, null).size() <= 0){
+                throw new GFacHandlerException("Cannot find any hadoop configuration files inside specified directory.");
+            }
+
+            this.hadoopConfigDir = hadoopConfigDir;
+        }
+
+        if(jobExecutionContext.isInPath()){
+            try {
+                handleInPath(jobExecutionContext);
+            } catch (IOException e) {
+                throw new GFacHandlerException("Error while copying input data from local file system to HDFS.",e);
+            }
+        } else {
+            handleOutPath(jobExecutionContext);
+        }
+    }
+
+    private void handleInPath(JobExecutionContext jobExecutionContext) throws GFacHandlerException, IOException {
+        ApplicationDeploymentDescriptionType appDepDesc =
+                jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+        HadoopApplicationDeploymentDescriptionType hadoopAppDesc =
+                (HadoopApplicationDeploymentDescriptionType)appDepDesc;
+        if(appDepDesc.isSetInputDataDirectory() && isInputDataDirectoryLocal(appDepDesc)){
+            Configuration hadoopConf = HadoopUtils.createHadoopConfiguration(jobExecutionContext, isWhirrBasedDeployment, hadoopConfigDir);
+            FileSystem hdfs = FileSystem.get(hadoopConf);
+            hdfs.copyFromLocalFile(new Path(appDepDesc.getInputDataDirectory()),
+                    new Path(hadoopAppDesc.getHadoopJobConfiguration().getHdfsInputDirectory()));
+        }
+    }
+
+    private boolean isInputDataDirectoryLocal(ApplicationDeploymentDescriptionType appDepDesc){
+        String inputDataDirectoryPath = appDepDesc.getInputDataDirectory();
+        File inputDataDirectory = new File(inputDataDirectoryPath);
+        if(inputDataDirectory.exists() && FileUtils.listFiles(inputDataDirectory, null, null).size() > 0){
+            return true;
+        }
+
+        return false;
+    }
+
+    private void handleOutPath(JobExecutionContext jobExecutionContext){}
+}
\ No newline at end of file

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HadoopDeploymentHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HadoopDeploymentHandler.java?rev=1450409&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HadoopDeploymentHandler.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/HadoopDeploymentHandler.java Tue Feb 26 20:55:36 2013
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.handler;
+
+import com.google.common.io.Files;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.HadoopHostType;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.whirr.Cluster;
+import org.apache.whirr.ClusterController;
+import org.apache.whirr.ClusterControllerFactory;
+import org.apache.whirr.ClusterSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.*;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.whirr.ClusterSpec.Property.*;
+import static org.apache.whirr.ClusterSpec.Property.INSTANCE_TEMPLATES;
+import static org.apache.whirr.ClusterSpec.Property.PRIVATE_KEY_FILE;
+
+/**
+ * This handler takes care of deploying hadoop in cloud(in cloud bursting scenarios) and
+ * deploying hadoop in local cluster. In case of existing hadoop cluster this will ignore
+ * cluster setup just use the hadoop configuration provided by user.
+ */
+public class HadoopDeploymentHandler implements GFacHandler {
+    private static final Logger logger = LoggerFactory.getLogger("hadoop-dep-handler");
+
+    /**
+     * Once invoked this method will deploy Hadoop in a local cluster or cloud based on the
+     * configuration provided. If there is a already deployed hadoop cluster this will skip
+     * deployment.
+     *
+     * @param jobExecutionContext job execution context containing all the required configurations
+     *                            and runtime information.
+     * @throws GFacHandlerException
+     */
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        if(jobExecutionContext.isInPath()){
+            handleInPath(jobExecutionContext);
+        } else {
+            handleOutPath(jobExecutionContext);
+        }
+    }
+
+    private void handleInPath(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        HostDescription hostDescription =
+                jobExecutionContext.getApplicationContext().getHostDescription();
+        if (!isHadoopDeploymentAvailable(hostDescription)) {
+            // Temp directory to keep generated configuration files.
+            File tempDirectory = Files.createTempDir();
+            try {
+                File hadoopSiteXML = launchHadoopCluster(hostDescription, tempDirectory);
+                jobExecutionContext.getInMessageContext().addParameter("HADOOP_SITE_XML", hadoopSiteXML.getAbsolutePath());
+                jobExecutionContext.getInMessageContext().addParameter("HADOOP_DEPLOYMENT_TYPE", "WHIRR");
+                // TODO: Add hadoop-site.xml to job execution context.
+            } catch (IOException e) {
+                throw new GFacHandlerException("IO Error while processing configurations.",e);
+            } catch (ConfigurationException e) {
+                throw  new GFacHandlerException("Whirr configuration error.", e);
+            } catch (InterruptedException e) {
+                throw new GFacHandlerException("Hadoop cluster launch interrupted.", e);
+            } catch (TransformerException e) {
+                throw new GFacHandlerException("Error while creating hadoop-site.xml", e);
+            } catch (ParserConfigurationException e) {
+                throw new GFacHandlerException("Error while creating hadoop-site.xml", e);
+            }
+        } else {
+            jobExecutionContext.getInMessageContext().addParameter("HADOOP_DEPLOYMENT_TYPE",
+                    "MANUAL");
+            jobExecutionContext.getInMessageContext().addParameter("HADOOP_CONFIG_DIR",
+                    ((HadoopHostType)hostDescription.getType()).getHadoopConfigurationDirectory());
+            logger.info("Hadoop configuration is available. Skipping hadoop deployment.");
+            if(logger.isDebugEnabled()){
+                logger.debug("Hadoop configuration directory: " +
+                        getHadoopConfigDirectory(hostDescription));
+            }
+        }
+    }
+
+    private void handleOutPath(JobExecutionContext jobExecutionContext){
+        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+        if(((String)inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE")).equals("WHIRR")){
+            // TODO: Shutdown hadoop cluster.
+            logger.info("Shutdown hadoop cluster.");
+        }
+    }
+
+    private File launchHadoopCluster(HostDescription hostDescription, File workingDirectory)
+            throws IOException, GFacHandlerException, ConfigurationException, InterruptedException, TransformerException, ParserConfigurationException {
+        ClusterSpec hadoopClusterSpec =
+                whirrConfigurationToClusterSpec(hostDescription, workingDirectory);
+        ClusterController hadoopClusterController =
+                createClusterController(hadoopClusterSpec.getServiceName());
+        Cluster hadoopCluster =  hadoopClusterController.launchCluster(hadoopClusterSpec);
+
+        logger.info(String.format("Started cluster of %s instances.\n",
+                hadoopCluster.getInstances().size()));
+
+        File siteXML = new File(workingDirectory, "hadoop-site.xml");
+        clusterPropertiesToHadoopSiteXml(hadoopCluster.getConfiguration(), siteXML);
+
+        return siteXML;
+    }
+
+    private ClusterController createClusterController(String serviceName){
+        ClusterControllerFactory factory = new ClusterControllerFactory();
+        ClusterController controller = factory.create(serviceName);
+
+        if(controller == null){
+            logger.warn("Unable to find the service {0}, using default.", serviceName);
+            controller = factory.create(null);
+        }
+
+        return controller;
+    }
+
+    private ClusterSpec whirrConfigurationToClusterSpec(HostDescription hostDescription,
+                                                        File workingDirectory) throws IOException, GFacHandlerException, ConfigurationException {
+        File whirrConfig = getWhirrConfigurationFile(hostDescription, workingDirectory);
+        CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
+        Configuration configuration = new PropertiesConfiguration(whirrConfig);
+        compositeConfiguration.addConfiguration(configuration);
+
+        ClusterSpec hadoopClusterSpec = new ClusterSpec(compositeConfiguration);
+
+        for (ClusterSpec.Property required : EnumSet.of(CLUSTER_NAME, PROVIDER, IDENTITY, CREDENTIAL,
+                INSTANCE_TEMPLATES, PRIVATE_KEY_FILE)) {
+            if (hadoopClusterSpec.getConfiguration().getString(required.getConfigName()) == null) {
+                throw new IllegalArgumentException(String.format("Option '%s' not set.",
+                        required.getSimpleName()));
+            }
+        }
+
+        return hadoopClusterSpec;
+    }
+
+    private File getWhirrConfigurationFile(HostDescription hostDescription, File workingDirectory)
+            throws GFacHandlerException, IOException {
+        HadoopHostType hadoopHostDesc = (HadoopHostType)hostDescription;
+        if(hadoopHostDesc.isSetWhirrConfiguration()){
+            HadoopHostType.WhirrConfiguration whirrConfig = hadoopHostDesc.getWhirrConfiguration();
+            if(whirrConfig.isSetConfigurationFile()){
+                File whirrConfigFile = new File(whirrConfig.getConfigurationFile());
+                if(!whirrConfigFile.exists()){
+                    throw new GFacHandlerException(
+                            "Specified whirr configuration file doesn't exists.");
+                }
+
+                FileUtils.copyFileToDirectory(whirrConfigFile, workingDirectory);
+
+                return new File(workingDirectory, whirrConfigFile.getName());
+            } else if(whirrConfig.isSetConfiguration()){
+                Properties whirrConfigProps =
+                        whirrConfigurationsToProperties(whirrConfig.getConfiguration());
+                File whirrConfigFile = new File(workingDirectory, "whirr-hadoop.config");
+                whirrConfigProps.store(
+                        new FileOutputStream(whirrConfigFile), null);
+
+                return whirrConfigFile;
+            }
+        }
+
+        throw new GFacHandlerException("Cannot find Whirr configurations. Whirr configuration "
+                + "is required if you don't have already running Hadoop deployment.");
+    }
+
+    private Properties whirrConfigurationsToProperties(
+            HadoopHostType.WhirrConfiguration.Configuration configuration){
+        Properties whirrConfigProps = new Properties();
+
+        for(HadoopHostType.WhirrConfiguration.Configuration.Property property:
+                configuration.getPropertyArray()) {
+            whirrConfigProps.put(property.getName(), property.getValue());
+        }
+
+        return whirrConfigProps;
+    }
+
+    private void clusterPropertiesToHadoopSiteXml(Properties props, File hadoopSiteXml) throws ParserConfigurationException, TransformerException {
+        DocumentBuilderFactory domFactory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder documentBuilder = domFactory.newDocumentBuilder();
+
+        Document hadoopSiteXmlDoc = documentBuilder.newDocument();
+
+        hadoopSiteXmlDoc.setXmlVersion("1.0");
+        hadoopSiteXmlDoc.setXmlStandalone(true);
+        hadoopSiteXmlDoc.createProcessingInstruction("xml-stylesheet", "type=\"text/xsl\" href=\"configuration.xsl\"");
+
+        Element configEle = hadoopSiteXmlDoc.createElement("configuration");
+
+        hadoopSiteXmlDoc.appendChild(configEle);
+
+        for(Map.Entry<Object, Object> entry : props.entrySet()){
+            addPropertyToConfiguration(entry, configEle, hadoopSiteXmlDoc);
+        }
+
+        saveDomToFile(hadoopSiteXmlDoc, hadoopSiteXml);
+    }
+
+    private void saveDomToFile(Document dom, File destFile) throws TransformerException {
+        Source source = new DOMSource(dom);
+
+        Result result = new StreamResult(destFile);
+
+        Transformer transformer = TransformerFactory.newInstance().newTransformer();
+        transformer.transform(source, result);
+    }
+
+    private void addPropertyToConfiguration(Map.Entry<Object, Object> entry, Element configElement, Document doc){
+        Element property = doc.createElement("property");
+        configElement.appendChild(property);
+
+        Element nameEle = doc.createElement("name");
+        nameEle.setTextContent(entry.getKey().toString());
+        property.appendChild(nameEle);
+
+        Element valueEle = doc.createElement("value");
+        valueEle.setTextContent(entry.getValue().toString());
+        property.appendChild(valueEle);
+    }
+
+    private boolean isHadoopDeploymentAvailable(HostDescription hostDescription) {
+        return ((HadoopHostType) hostDescription.getType()).isSetHadoopConfigurationDirectory();
+    }
+
+    private String getHadoopConfigDirectory(HostDescription hostDescription){
+        return ((HadoopHostType)hostDescription.getType()).getHadoopConfigurationDirectory();
+    }
+
+
+}
\ No newline at end of file

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java?rev=1450409&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java Tue Feb 26 20:55:36 2013
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.provider.impl;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.utils.HadoopUtils;
+import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Executes hadoop job using the cluster configuration provided by handlers in
+ * in-flow.
+ */
+public class HadoopProvider implements GFacProvider{
+    private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
+
+    private boolean isWhirrBasedDeployment = false;
+    private File hadoopConfigDir;
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+        if(inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE").equals("WHIRR")){
+            isWhirrBasedDeployment = true;
+        } else {
+            String hadoopConfigDirPath = (String)inMessageContext.getParameter("HADOOP_CONFIG_DIR");
+            File hadoopConfigDir = new File(hadoopConfigDirPath);
+            if (!hadoopConfigDir.exists()){
+                throw new GFacProviderException("Specified hadoop configuration directory doesn't exist.");
+            } else if (FileUtils.listFiles(hadoopConfigDir, null, null).size() <= 0){
+                throw new GFacProviderException("Cannot find any hadoop configuration files inside specified directory.");
+            }
+
+            this.hadoopConfigDir = hadoopConfigDir;
+        }
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        HadoopApplicationDeploymentDescriptionType hadoopAppDesc =
+                (HadoopApplicationDeploymentDescriptionType)jobExecutionContext
+                        .getApplicationContext().getApplicationDeploymentDescription().getType();
+        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+        HadoopApplicationDeploymentDescriptionType.HadoopJobConfiguration jobConf = hadoopAppDesc.getHadoopJobConfiguration();
+
+        try{
+            // Preparing Hadoop configuration
+            Configuration hadoopConf = HadoopUtils.createHadoopConfiguration(
+                    jobExecutionContext, isWhirrBasedDeployment, hadoopConfigDir);
+
+            // Load jar containing map-reduce job implementation
+            ArrayList<URL> mapRedJars = new ArrayList<URL>();
+            mapRedJars.add(new File(jobConf.getJarLocation()).toURL());
+            URLClassLoader childClassLoader = new URLClassLoader(mapRedJars.toArray(new URL[mapRedJars.size()]),
+                    this.getClass().getClassLoader());
+
+            Job job = new Job(hadoopConf);
+
+            job.setJobName(jobConf.getJobName());
+
+            job.setOutputKeyClass(Class.forName(jobConf.getOutputKeyClass(), true, childClassLoader));
+            job.setOutputValueClass(Class.forName(jobConf.getOutputValueClass(), true, childClassLoader));
+
+            job.setMapperClass((Class<? extends Mapper>)Class.forName(jobConf.getMapperClass(), true, childClassLoader));
+            job.setCombinerClass((Class<? extends Reducer>) Class.forName(jobConf.getCombinerClass(), true, childClassLoader));
+            job.setReducerClass((Class<? extends Reducer>) Class.forName(jobConf.getCombinerClass(), true, childClassLoader));
+
+            job.setInputFormatClass((Class<? extends InputFormat>)Class.forName(jobConf.getInputFormatClass(), true, childClassLoader));
+            job.setOutputFormatClass((Class<? extends OutputFormat>) Class.forName(jobConf.getOutputFormatClass(), true, childClassLoader));
+
+            FileInputFormat.setInputPaths(job, new Path(hadoopAppDesc.getInputDataDirectory()));
+            FileOutputFormat.setOutputPath(job, new Path(hadoopAppDesc.getOutputDataDirectory()));
+
+            job.waitForCompletion(true);
+            System.out.println(job.getTrackingURL());
+            if(jobExecutionContext.getOutMessageContext() == null){
+                jobExecutionContext.setOutMessageContext(new MessageContext());
+            }
+
+            OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext().
+                    getServiceDescription().getType().getOutputParametersArray();
+            for(OutputParameterType outparamType : outputParametersArray){
+                String paramName = outparamType.getParameterName();
+                if(paramName.equals("test-hadoop")){
+                    ActualParameter outParam = new ActualParameter();
+                    outParam.getType().changeType(StringParameterType.type);
+                    ((StringParameterType) outParam.getType()).setValue(job.getTrackingURL());
+                    jobExecutionContext.getOutMessageContext().addParameter("test-hadoop", outParam);
+                }
+            }
+        } catch (Exception e) {
+            String errMessage = "Error occurred during Map-Reduce job execution.";
+            logger.error(errMessage, e);
+            throw new GFacProviderException(errMessage, e);
+        }
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        // TODO: How to handle cluster shutdown. Best way is to introduce inPath/outPath to handler.
+    }
+}

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/HadoopUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/HadoopUtils.java?rev=1450409&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/HadoopUtils.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/HadoopUtils.java Tue Feb 26 20:55:36 2013
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.provider.utils;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.Collection;
+
+public class HadoopUtils {
+    public static Configuration createHadoopConfiguration(
+            JobExecutionContext jobExecutionContext,
+            boolean isWhirrBasedDeployment,
+            File hadoopConfigDir) throws FileNotFoundException {
+        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
+        Configuration hadoopConf = new Configuration();
+
+        if(isWhirrBasedDeployment){
+            hadoopConf.addResource(new FileInputStream(
+                    new File((String)inMessageContext.getParameter("HADOOP_SITE_XML"))));
+        } else {
+            readHadoopClusterConfigurationFromDirectory(hadoopConfigDir, hadoopConf);
+        }
+
+        return hadoopConf;
+    }
+
+    private static void readHadoopClusterConfigurationFromDirectory(File localHadoopConfigurationDirectory, Configuration hadoopConf)
+            throws FileNotFoundException {
+        Collection hadoopConfigurationFiles =
+                FileUtils.listFiles(localHadoopConfigurationDirectory, null, false);
+        for (Object f : hadoopConfigurationFiles) {
+            hadoopConf.addResource(new FileInputStream((File)f));
+        }
+    }
+}