You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/05/19 05:20:34 UTC

falcon git commit: FALCON-1853 : Add spark process workflow builder

Repository: falcon
Updated Branches:
  refs/heads/master 6d73fad44 -> 6e034074f


FALCON-1853 : Add spark process workflow builder

Following is the pull request that will perform the integration to execute the Spark application through Falcon. Please review.

XSD changes for integration are already committed through pull request #89 .

Author: peeyush b <pb...@hortonworks.com>

Reviewers: "Pavan Kumar Kolamuri <pa...@gmail.com> , Venkat Ranganathan <vr...@hortonworks.com>"

Closes #118 from peeyushb/FALCON-1853


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6e034074
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6e034074
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6e034074

Branch: refs/heads/master
Commit: 6e034074f65a6498af532fd0af143c21c410374a
Parents: 6d73fad
Author: peeyush b <pb...@hortonworks.com>
Authored: Thu May 19 10:49:13 2016 +0530
Committer: pavankumar526 <pa...@gmail.com>
Committed: Thu May 19 10:49:13 2016 +0530

----------------------------------------------------------------------
 common/pom.xml                                  |  26 +++
 .../org/apache/falcon/entity/ClusterHelper.java |   5 +
 .../entity/parser/ClusterEntityParser.java      |  16 ++
 .../entity/parser/ProcessEntityParser.java      |  32 ++++
 .../apache/falcon/entity/AbstractTestBase.java  |   1 +
 .../entity/parser/ClusterEntityParserTest.java  |   1 +
 .../entity/parser/ProcessEntityParserTest.java  |  19 ++
 .../resources/config/cluster/cluster-0.1.xml    |   1 +
 .../config/process/spark-process-0.1.xml        |  60 ++++++
 oozie/pom.xml                                   |  24 +++
 .../OozieOrchestrationWorkflowBuilder.java      |   4 +
 .../ProcessExecutionWorkflowBuilder.java        |   4 +-
 .../process/SparkProcessWorkflowBuilder.java    | 189 +++++++++++++++++++
 .../java/org/apache/falcon/util/OozieUtils.java |  27 +++
 .../resources/action/process/spark-action.xml   |  38 ++++
 .../OozieProcessWorkflowBuilderTest.java        |  48 +++++
 .../config/process/spark-process-0.1.xml        |  62 ++++++
 pom.xml                                         |   7 +
 18 files changed, 563 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index c54f9d8..1ead737 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -207,6 +207,32 @@
             <artifactId>validation-api</artifactId>
             <version>${javax-validation.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jackson-databind</artifactId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jackson-module-scala_2.10</artifactId>
+                    <groupId>com.fasterxml.jackson.module</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty.orbit</groupId>
+                    <artifactId>javax.servlet</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>net.java.dev.jets3t</groupId>
+            <artifactId>jets3t</artifactId>
+            <version>0.9.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index aff4405..9e16fa4 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -113,6 +113,11 @@ public final class ClusterHelper {
         return messageInterface == null ? NO_USER_BROKER_URL : messageInterface.getEndpoint();
     }
 
+    public static String getSparkMasterEndPoint(Cluster cluster) {
+        final Interface sparkInterface = getInterface(cluster, Interfacetype.SPARK);
+        return sparkInterface == null ? null : sparkInterface.getEndpoint();
+    }
+
     public static String getMessageBrokerImplClass(Cluster cluster) {
         if (cluster.getProperties() != null) {
             for (Property prop : cluster.getProperties().getProperties()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index c3bdf3b..96ba748 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,6 +96,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateRegistryInterface(cluster);
         validateLocations(cluster);
         validateProperties(cluster);
+        validateSparkMasterInterface(cluster);
     }
 
     private void validateScheme(Cluster cluster, Interfacetype interfacetype)
@@ -232,6 +235,19 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         }
     }
 
+    protected void validateSparkMasterInterface(Cluster cluster) throws ValidationException {
+        final String sparkMasterUrl = ClusterHelper.getSparkMasterEndPoint(cluster);
+        if (StringUtils.isNotEmpty(sparkMasterUrl)) {
+            SparkConf sparkConf = new SparkConf();
+            sparkConf.setMaster(sparkMasterUrl).setAppName("Falcon Spark");
+
+            JavaSparkContext sc = new JavaSparkContext(sparkConf);
+            if (sc.startTime() == null) {
+                throw new ValidationException("Unable to reach Spark master URL:" + sparkMasterUrl);
+            }
+        }
+    }
+
     /**
      * Validate ACL if authorization is enabled.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 8edec5b..38fa3ae 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -32,12 +32,15 @@ import org.apache.falcon.entity.v0.process.Properties;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.SparkAttributes;
+import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.DateUtil;
@@ -128,6 +131,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
         validateLateInputs(process);
         validateProcessSLA(process);
         validateHadoopQueue(process);
+        validateProcessEntity(process);
     }
 
 
@@ -374,4 +378,32 @@ public class ProcessEntityParser extends EntityParser<Process> {
         }
     }
 
+    protected void validateProcessEntity(Process process) throws FalconException {
+        validateSparkProcessEntity(process, process.getSparkAttributes());
+    }
+
+    private void validateSparkProcessEntity(Process process, SparkAttributes sparkAttributes) throws
+            FalconException {
+        Workflow workflow = process.getWorkflow();
+        if (workflow.getEngine() == EngineType.SPARK) {
+            if (sparkAttributes == null) {
+                throw new ValidationException(
+                        "For Spark Workflow engine Spark Attributes in Process Entity can't be null");
+            } else {
+                String clusterName = process.getClusters().getClusters().get(0).getName();
+                org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                        ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+                String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
+                String processEntitySparkMaster = sparkAttributes.getMaster();
+                String sparkMaster = (processEntitySparkMaster == null)
+                        ? clusterEntitySparkMaster
+                        : processEntitySparkMaster;
+                if (StringUtils.isEmpty(sparkMaster)
+                        || StringUtils.isEmpty(sparkAttributes.getJar())) {
+                    throw new ValidationException("Spark master and jar/python file can't be null");
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index fd963e5..3745955 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -59,6 +59,7 @@ public class AbstractTestBase {
     protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
     protected static final String DATASOURCE_XML = "/config/datasource/datasource-0.1.xml";
+    protected static final String SPARK_PROCESS_XML = "/config/process/spark-process-0.1.xml";
     protected EmbeddedCluster dfsCluster;
     protected Configuration conf = new Configuration();
     private ConfigurationStore store;

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 4b4b657..872c2f7 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -171,6 +171,7 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
         Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
         Mockito.doNothing().when(clusterEntityParser).validateLocations(cluster);
+        Mockito.doNothing().when(clusterEntityParser).validateSparkMasterInterface(cluster);
 
         // Good set of properties, should work
         clusterEntityParser.validateProperties(cluster);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 64f62a5..c4bfff6 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -21,7 +21,9 @@ package org.apache.falcon.entity.parser;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -641,4 +643,21 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         process.getClusters().getClusters().get(0).getValidity().setEnd(null);
         parser.validate(process);
     }
+
+    @Test
+    public void testSparkProcessEntity() throws FalconException {
+        Process process = parser.parseAndValidate((ProcessEntityParserTest.class)
+                .getResourceAsStream(SPARK_PROCESS_XML));
+        Assert.assertEquals(process.getWorkflow().getEngine().value(), "spark");
+        Assert.assertNotNull(process.getWorkflow().getPath());
+        Cluster processCluster = process.getClusters().getClusters().get(0);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster =
+                ConfigurationStore.get().get(EntityType.CLUSTER, processCluster.getName());
+        String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
+        String processEntitySparkMaster = process.getSparkAttributes().getMaster();
+        String sparkMaster = (processEntitySparkMaster == null) ? clusterEntitySparkMaster : processEntitySparkMaster;
+
+        Assert.assertEquals(sparkMaster, "local");
+        parser.validate(process);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-0.1.xml b/common/src/test/resources/config/cluster/cluster-0.1.xml
index 5e36f72..0ef317a 100644
--- a/common/src/test/resources/config/cluster/cluster-0.1.xml
+++ b/common/src/test/resources/config/cluster/cluster-0.1.xml
@@ -31,6 +31,7 @@
                    version="5.1.6"/>
         <interface type="registry" endpoint="http://localhost:48080/templeton/v1"
                    version="0.11.0"/>
+        <interface type="spark" endpoint="http://localhost:7070" version="1.6.1"/>
     </interfaces>
     <locations>
         <location name="staging" path="/projects/falcon/staging"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/common/src/test/resources/config/process/spark-process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/spark-process-0.1.xml b/common/src/test/resources/config/process/spark-process-0.1.xml
new file mode 100644
index 0000000..404e1b7
--- /dev/null
+++ b/common/src/test/resources/config/process/spark-process-0.1.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<process name="spark-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="testCluster">
+            <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+
+    <!-- what -->
+    <inputs>
+        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+        <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+        <output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="oozie.launcher.mapreduce.map.memory.mb" value="2072"/>
+        <property name="oozie.launcher.mapreduce.map.java.opts" value="-Xmx2500m"/>
+    </properties>
+
+    <workflow engine="spark" path="/falcon/test"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark WordCount Application</name>
+        <class>org.apache.falcon.example.spark.SparkWordCount</class>
+        <jar>/falcon/test/workflow/lib/falcon-examples.jar</jar>
+        <spark-opts>--executor-core 1 --executor-memory 512m --driver-memory 512m</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 928edc4..7e45b44 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -109,6 +109,11 @@
             <version>${joda.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -218,6 +223,25 @@
                         </configuration>
                     </execution>
                     <execution>
+                        <id>spark-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.spark</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>spark-action-0.1.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
                         <id>bundle-gen</id>
                         <goals>
                             <goal>generate</goal>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 562627e..0801899 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -40,6 +40,7 @@ import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.SparkProcessWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.CREDENTIAL;
@@ -190,6 +191,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
             case HIVE:
                 return new HiveProcessWorkflowBuilder(process);
 
+            case SPARK:
+                return new SparkProcessWorkflowBuilder(process);
+
             default:
                 break;
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 7d5b331..5d2c43e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -213,7 +213,9 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
             configProperty.setValue((String) entry.getValue());
             configuration.add(configProperty);
 
-            paramList.add(entry.getKey() + "=" + entry.getValue());
+            if (paramList != null) {
+                paramList.add(entry.getKey() + "=" + entry.getValue());
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
new file mode 100644
index 0000000..dc5a491
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+import java.util.List;
+
+/**
+ * Builds orchestration workflow for process where engine is spark.
+ */
+public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
+
+    public SparkProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
+
+    @Override
+    protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+        ACTION action = unmarshalAction(ACTION_TEMPLATE);
+        JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement = OozieUtils.unMarshalSparkAction(action);
+        org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
+
+        String sparkMasterURL = entity.getSparkAttributes().getMaster();
+        String sparkFilePath = entity.getSparkAttributes().getJar();
+        String sparkJobName = entity.getSparkAttributes().getName();
+        String sparkOpts = entity.getSparkAttributes().getSparkOpts();
+        String sparkClassName = entity.getSparkAttributes().getClazz();
+
+        String clusterEntitySparkMasterURL = getClusterEntitySparkMaster(cluster);
+
+        //Overriding cluster spark master url if defined in process entity
+        sparkMasterURL = (sparkMasterURL == null) ? clusterEntitySparkMasterURL : sparkMasterURL;
+        if (StringUtils.isBlank(sparkMasterURL)) {
+            throw new FalconException("Spark Master URL can'be empty");
+        }
+        sparkAction.setMaster(sparkMasterURL);
+        sparkAction.setName(sparkJobName);
+
+        addPrepareDeleteOutputPath(sparkAction);
+
+        if (StringUtils.isNotEmpty(sparkOpts)) {
+            sparkAction.setSparkOpts(sparkOpts);
+        }
+
+        if (StringUtils.isNotEmpty(sparkClassName)) {
+            sparkAction.setClazz(sparkClassName);
+        }
+
+        List<String> argList = sparkAction.getArg();
+        List<String> sparkArgs = entity.getSparkAttributes().getArgs();
+        if (sparkArgs != null) {
+            argList.addAll(sparkArgs);
+        }
+
+        addInputFeedsAsArgument(argList, cluster);
+        addOutputFeedsAsArgument(argList, cluster);
+
+        sparkAction.setJar(addUri(sparkFilePath, cluster));
+
+        setSparkLibFileToWorkflowLib(sparkFilePath, entity);
+        propagateEntityProperties(sparkAction);
+
+        OozieUtils.marshalSparkAction(action, actionJaxbElement);
+        return action;
+    }
+
+    private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) {
+        if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
+            entity.getWorkflow().setLib(sparkFile);
+        }
+    }
+
+    private void addPrepareDeleteOutputPath(org.apache.falcon.oozie.spark.ACTION sparkAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.spark.PREPARE prepare = new org.apache.falcon.oozie.spark.PREPARE();
+        List<org.apache.falcon.oozie.spark.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.spark.DELETE delete = new org.apache.falcon.oozie.spark.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            sparkAction.setPrepare(prepare);
+        }
+    }
+
+    private void propagateEntityProperties(org.apache.falcon.oozie.spark.ACTION sparkAction) {
+        CONFIGURATION conf = new CONFIGURATION();
+        super.propagateEntityProperties(conf, null);
+
+        List<Property> sparkConf = sparkAction.getConfiguration().getProperty();
+        for (CONFIGURATION.Property prop : conf.getProperty()) {
+            Property sparkProp = new Property();
+            sparkProp.setName(prop.getName());
+            sparkProp.setValue(prop.getValue());
+            sparkConf.add(sparkProp);
+        }
+    }
+
+    private void addInputFeedsAsArgument(List<String> argList, Cluster cluster) throws FalconException {
+        if (entity.getInputs() == null) {
+            return;
+        }
+
+        int numInputFeed = entity.getInputs().getInputs().size();
+        while (numInputFeed > 0) {
+            Input input = entity.getInputs().getInputs().get(numInputFeed-1);
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+            final String inputName = input.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                argList.add(0, "${" + inputName + "}");
+            }
+            numInputFeed--;
+        }
+    }
+
+    private void addOutputFeedsAsArgument(List<String> argList, Cluster cluster) throws FalconException {
+        if (entity.getOutputs() == null) {
+            return;
+        }
+
+        for(Output output : entity.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+            final String outputName = output.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                argList.add(argList.size(), "${" + outputName + "}");
+            }
+        }
+    }
+
+    private String addUri(String jarFile, Cluster cluster) throws FalconException {
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
+        Path jarFilePath = new Path(jarFile);
+        if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) {
+            return fs.makeQualified(jarFilePath).toString();
+        }
+        return jarFile;
+    }
+
+    private String getClusterEntitySparkMaster(Cluster cluster) {
+        return ClusterHelper.getSparkMasterEndPoint(cluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 708788b..fd94d8e 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -47,6 +47,7 @@ public final class OozieUtils {
     public static final JAXBContext CONFIG_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
     protected static final JAXBContext SQOOP_ACTION_JAXB_CONTEXT;
+    protected static final JAXBContext SPARK_ACTION_JAXB_CONTEXT;
 
     static {
         try {
@@ -59,6 +60,8 @@ public final class OozieUtils {
                 org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
             SQOOP_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
                     org.apache.falcon.oozie.sqoop.ACTION.class.getPackage().getName());
+            SPARK_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+                    org.apache.falcon.oozie.spark.ACTION.class.getPackage().getName());
         } catch (JAXBException e) {
             throw new RuntimeException("Unable to create JAXB context", e);
         }
@@ -125,4 +128,28 @@ public final class OozieUtils {
             throw new RuntimeException("Unable to marshall sqoop action.", e);
         }
     }
+
+    public static JAXBElement<org.apache.falcon.oozie.spark.ACTION> unMarshalSparkAction(
+            org.apache.falcon.oozie.workflow.ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = SPARK_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.spark.ACTION>)
+                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall spark action.", e);
+        }
+    }
+
+    public static  void marshalSparkAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+                                          JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionjaxbElement) {
+        try {
+            DOMResult sparkActionDOM = new DOMResult();
+            Marshaller marshaller = SPARK_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, sparkActionDOM);
+            wfAction.setAny(((Document) sparkActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall spark action.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/main/resources/action/process/spark-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/spark-action.xml b/oozie/src/main/resources/action/process/spark-action.xml
new file mode 100644
index 0000000..d08e65f
--- /dev/null
+++ b/oozie/src/main/resources/action/process/spark-action.xml
@@ -0,0 +1,38 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<action name="user-action" xmlns="uri:oozie:workflow:0.3">
+    <spark xmlns="uri:oozie:spark-action:0.1">
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+        </configuration>
+        <master>#MASTER_URL#</master>
+        <name>#NAME#</name>
+        <jar>#JAR_FILE_PATH</jar>
+    </spark>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 8d824ba..85100e7 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -88,6 +88,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     private static final String FEED_XML = "/config/feed/feed-0.1.xml";
     private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
     private static final String PIG_PROCESS_XML = "/config/process/pig-process-0.1.xml";
+    private static final String SPARK_PROCESS_XML = "/config/process/spark-process-0.1.xml";
 
     private String hdfsUrl;
     private FileSystem fs;
@@ -324,6 +325,53 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
     }
 
+    @Test
+    public void testSparkProcess() throws Exception {
+
+        URL resource = this.getClass().getResource(SPARK_PROCESS_XML);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+        Assert.assertEquals("spark", process.getWorkflow().getEngine().value());
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
+        testParentWorkflow(process, parentWorkflow);
+
+        ACTION sparkNode = getAction(parentWorkflow, "user-action");
+
+        JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
+                OozieUtils.unMarshalSparkAction(sparkNode);
+        org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
+        assertEquals(sparkAction.getMaster(), "local");
+        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/spark-wordcount.jar");
+        List<String> argsList = sparkAction.getArg();
+        Input input = process.getInputs().getInputs().get(0);
+        Output output = process.getOutputs().getOutputs().get(0);
+        assertEquals(argsList.get(0), "${"+input.getName().toString()+"}");
+        assertEquals(argsList.get(argsList.size()-1), "${"+output.getName().toString()+"}");
+    }
+
     @Test (dataProvider = "secureOptions")
     public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
         StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/oozie/src/test/resources/config/process/spark-process-0.1.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/spark-process-0.1.xml b/oozie/src/test/resources/config/process/spark-process-0.1.xml
new file mode 100644
index 0000000..a94f807
--- /dev/null
+++ b/oozie/src/test/resources/config/process/spark-process-0.1.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<process name="spark-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+        <input name="input" feed="clicks" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+        <output name="output" feed="clicks" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="oozie.launcher.mapreduce.map.memory.mb" value="2072"/>
+        <property name="oozie.launcher.mapreduce.map.java.opts" value="-Xmx2500m"/>
+        <property name="mapred.job.priority" value="HIGH"/>
+    </properties>
+
+    <workflow engine="spark" path="/resources/action"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark WordCount Application</name>
+        <class>org.examples.WordCount</class>
+        <jar>/resources/action/lib/spark-wordcount.jar</jar>
+        <spark-opts>--num-executors 1 --executor-memory 512m  --driver-memory 512m</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6e034074/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1a651ea..20653bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
         <oozie.forcebuild>false</oozie.forcebuild>
         <activemq.version>5.12.0</activemq.version>
         <hive.version>0.13.1</hive.version>
+        <spark.version>1.6.1</spark.version>
         <jetty.version>6.1.26</jetty.version>
         <jersey.version>1.9</jersey.version>
         <quartz.version>2.2.1</quartz.version>
@@ -1081,6 +1082,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-core_2.10</artifactId>
+                <version>${spark.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>com.github.stephenc.findbugs</groupId>
                 <artifactId>findbugs-annotations</artifactId>
                 <version>1.3.9-1</version>