You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/01 16:56:25 UTC

falcon git commit: FALCON-1938 Add support to execute Spark SQL process

Repository: falcon
Updated Branches:
  refs/heads/master bb23ccf08 -> c12c999b4


FALCON-1938 Add support to execute Spark SQL process

Author: peeyush b <pb...@hortonworks.com>

Reviewers: "Venkat Ranganathan  <ve...@hortonworks.com>"

Closes #188 from peeyushb/FALCON-1938


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c12c999b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c12c999b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c12c999b

Branch: refs/heads/master
Commit: c12c999b4cc89bd3fbe24873567f441eed02a4ef
Parents: bb23ccf
Author: peeyush b <pb...@hortonworks.com>
Authored: Fri Jul 1 09:56:21 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Fri Jul 1 09:56:21 2016 -0700

----------------------------------------------------------------------
 docs/src/site/twiki/EntitySpecification.twiki   | 23 ++++++
 examples/entity/spark/spark-sql-process.xml     | 55 +++++++++++++++
 examples/pom.xml                                | 10 +++
 .../example/spark/SparkSQLProcessTable.java     | 51 ++++++++++++++
 .../process/SparkProcessWorkflowBuilder.java    |  9 +++
 .../OozieProcessWorkflowBuilderTest.java        | 73 ++++++++++++++++++++
 .../config/process/spark-sql-process.xml        | 53 ++++++++++++++
 7 files changed, 274 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 278dc0e..9f9e210 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -927,6 +927,29 @@ Input and Output data to the Spark application will be set as argument when Spar
 In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.
 
 
+For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml
+under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster.
+The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute.
+Example:
+<verbatim>
+<process name="spark-process">
+...
+    <workflow engine="spark" path="/resources/action">
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.examples.SparkSQLProcessTable</class>
+        <jar>/resources/action/lib/spark-application.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml</spark-opts>
+    </spark-attributes>
+...
+</process>
+</verbatim>
+
+Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
+If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments.
+Once input and output arguments is set, then user's provided argument will be set.
+
 ---+++ Retry
 Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances.
 Syntax:

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/examples/entity/spark/spark-sql-process.xml
----------------------------------------------------------------------
diff --git a/examples/entity/spark/spark-sql-process.xml b/examples/entity/spark/spark-sql-process.xml
new file mode 100644
index 0000000..cdd2ccc
--- /dev/null
+++ b/examples/entity/spark/spark-sql-process.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="hcat-local">
+            <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+        <input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
+    </inputs>
+
+    <outputs>
+        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+        <output name="outpart" feed="hcat-out" instance="now(0,0)"/>
+    </outputs>
+
+    <workflow engine="spark" path="/app/spark"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
+        <jar>/app/spark/lib/falcon-examples.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 5e1dea4..4ae9be2 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -64,6 +64,16 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
new file mode 100644
index 0000000..5e9f092
--- /dev/null
+++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.example.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ * Spark SQL Example.
+ */
+
+public final class SparkSQLProcessTable {
+
+    private SparkSQLProcessTable() {
+    }
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.out.println("Arguments must contain details for input or output table");
+            System.exit(0);
+        }
+
+        SparkConf conf = new SparkConf().setAppName("SparkSQL example");
+        SparkContext sc = new SparkContext(conf);
+        HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
+
+        String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4]
+                +" PARTITION("+args[3]+")  SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word";
+
+        DataFrame df = sqlContext.sql(sqlQuery);
+        df.show();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index 8c06711..5f4fafa 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -46,6 +46,7 @@ import java.util.List;
  */
 public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
     private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
+    private static final String FALCON_PREFIX = "falcon_";
 
     public SparkProcessWorkflowBuilder(Process entity) {
         super(entity);
@@ -155,6 +156,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
             final String inputName = input.getName();
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
                 argList.add(0, "${" + inputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}");
             }
             numInputFeed--;
         }
@@ -174,6 +179,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
             final String outputName = output.getName();
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
                 argList.add(0, "${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
             }
             numOutputFeed--;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 85100e7..30ff537 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -326,6 +326,79 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     }
 
     @Test
+    public void testSparkSQLProcess() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/spark-sql-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
+
+        // verify table and hive props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+        expected.putAll(ClusterHelper.getHiveProperties(cluster));
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
+        testParentWorkflow(process, parentWorkflow);
+
+        ACTION sparkNode = getAction(parentWorkflow, "user-action");
+
+        JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
+                OozieUtils.unMarshalSparkAction(sparkNode);
+        org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
+
+        assertEquals(sparkAction.getMaster(), "local");
+        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        List<String> argsList = sparkAction.getArg();
+
+        Input input = process.getInputs().getInputs().get(0);
+        Output output = process.getOutputs().getOutputs().get(0);
+
+        assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}");
+        assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}");
+        assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}");
+        assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}");
+        assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}");
+        assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}");
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test
     public void testSparkProcess() throws Exception {
 
         URL resource = this.getClass().getResource(SPARK_PROCESS_XML);

http://git-wip-us.apache.org/repos/asf/falcon/blob/c12c999b/oozie/src/test/resources/config/process/spark-sql-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/spark-sql-process.xml b/oozie/src/test/resources/config/process/spark-sql-process.xml
new file mode 100644
index 0000000..55ff89b
--- /dev/null
+++ b/oozie/src/test/resources/config/process/spark-sql-process.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <workflow engine="spark" path="/resources/action"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
+        <jar>/resources/action/lib/falcon-examples.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
\ No newline at end of file