You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/07/13 07:39:59 UTC

incubator-airflow git commit: [AIRFLOW-306] Add Spark-sql Hook and Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3c91bbb5d -> 9f764275f


[AIRFLOW-306] Add Spark-sql Hook and Operator

This patch adds a hook and operator that allows execution of Spark-
sql queries. The hook is a wrapper around the Spark-sql binary.

Closes #1644 from danielvdende/spark_sql_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f764275
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f764275
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f764275

Branch: refs/heads/master
Commit: 9f764275f7e6dc0fe807e5b9e657434ea799d8ce
Parents: 3c91bbb
Author: Daniel van der Ende <da...@gmail.com>
Authored: Wed Jul 13 09:39:27 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jul 13 09:39:35 2016 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py         | 149 +++++++++++++++++++
 airflow/contrib/operators/spark_sql_operator.py |  91 +++++++++++
 2 files changed, 240 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f764275/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
new file mode 100644
index 0000000..ff5270b
--- /dev/null
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import subprocess
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+log = logging.getLogger(__name__)
+
+
+class SparkSqlHook(BaseHook):
+    """
+    This hook is a wrapper around the spark-sql binary. It requires that the
+    "spark-sql" binary is in the PATH.
+    :param sql: The SQL query to execute
+    :type sql: str
+    :param conf: arbitrary Spark configuration property
+    :type conf: str (format: PROP=VALUE)
+    :param conn_id: connection_id string
+    :type conn_id: str
+    :param executor_cores: Number of cores per executor
+    :type executor_cores: int
+    :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+    :type executor_memory: str
+    :param keytab: Full path to the file that contains the keytab
+    :type keytab: str
+    :param master: spark://host:port, mesos://host:port, yarn, or local
+    :type master: str
+    :param name: Name of the job.
+    :type name: str
+    :param num_executors: Number of executors to launch
+    :type num_executors: int
+    :param verbose: Whether to pass the verbose flag to spark-sql
+    :type verbose: bool
+    :param yarn_queue: The YARN queue to submit to (Default: "default")
+    :type yarn_queue: str
+    """
+    def __init__(self,
+                 sql,
+                 conf=None,
+                 conn_id='spark_sql_default',
+                 executor_cores=None,
+                 executor_memory=None,
+                 keytab=None,
+                 master='yarn',
+                 name='default-name',
+                 num_executors=None,
+                 verbose=True,
+                 yarn_queue='default'
+                 ):
+        self._sql = sql
+        self._conf = conf
+        self._conn = self.get_connection(conn_id)
+        self._executor_cores = executor_cores
+        self._executor_memory = executor_memory
+        self._keytab = keytab
+        self._master = master
+        self._name = name
+        self._num_executors = num_executors
+        self._verbose = verbose
+        self._yarn_queue = yarn_queue
+        self._sp = None
+
+    def get_conn(self):
+        pass
+
+    def _prepare_command(self, cmd):
+        """
+        Construct the spark-sql command to execute. Verbose output is enabled
+        as default.
+        :param cmd: command to append to the spark-sql command
+        :type cmd: str
+        :return: full command to be executed
+        """
+        connection_cmd = ["spark-sql"]
+        if self._conf:
+            for conf_el in self._conf.split(","):
+                connection_cmd += ["--conf", conf_el]
+        if self._executor_cores:
+            connection_cmd += ["--executor-cores", self._executor_cores]
+        if self._executor_memory:
+            connection_cmd += ["--executor-memory", self._executor_memory]
+        if self._keytab:
+            connection_cmd += ["--keytab", self._keytab]
+        if self._num_executors:
+            connection_cmd += ["--num_executors", self._num_executors]
+        if self._sql:
+            if self._sql.endswith('.sql'):
+                connection_cmd += ["-f", self._sql]
+            else:
+                connection_cmd += ["-e", self._sql]
+        if self._master:
+            connection_cmd += ["--master", self._master]
+        if self._name:
+            connection_cmd += ["--name", self._name]
+        if self._verbose:
+            connection_cmd += ["--verbose"]
+        if self._yarn_queue:
+            connection_cmd += ["--queue", self._yarn_queue]
+
+        connection_cmd += cmd
+        logging.debug("Spark-Sql cmd: {}".format(connection_cmd))
+
+        return connection_cmd
+
+    def run_query(self, cmd="", **kwargs):
+        """
+        Remote Popen (actually execute the Spark-sql query)
+
+        :param cmd: command to remotely execute
+        :param kwargs: extra arguments to Popen (see subprocess.Popen)
+        """
+        prefixed_cmd = self._prepare_command(cmd)
+        self._sp = subprocess.Popen(prefixed_cmd,
+                                    stdout=subprocess.PIPE,
+                                    stderr=subprocess.PIPE,
+                                    **kwargs)
+        # using two iterators here to support 'real-time' logging
+        for line in iter(self._sp.stdout.readline, b''):
+            line = line.decode('utf-8').strip()
+            logging.info(line)
+        for line in iter(self._sp.stderr.readline, b''):
+            line = line.decode('utf-8').strip()
+            logging.info(line)
+        output, stderr = self._sp.communicate()
+
+        if self._sp.returncode:
+            raise AirflowException("Cannot execute {} on {}. Error code is: "
+                                   "{}. Output: {}, Stderr: {}"
+                                   .format(cmd, self._conn.host,
+                                           self._sp.returncode, output, stderr))
+
+    def kill(self):
+        if self._sp and self._sp.poll() is None:
+            logging.info("Killing the Spark-Sql job")
+            self._sp.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f764275/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
new file mode 100644
index 0000000..4a2033c
--- /dev/null
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.hooks.spark_sql_hook import SparkSqlHook
+
+
+class SparkSqlOperator(BaseOperator):
+    """
+    Execute Spark SQL query
+    :param sql: The SQL query to execute
+    :type sql: str
+    :param conf: arbitrary Spark configuration property
+    :type conf: str (format: PROP=VALUE)
+    :param conn_id: connection_id string
+    :type conn_id: str
+    :param executor_cores: Number of cores per executor
+    :type executor_cores: int
+    :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+    :type executor_memory: str
+    :param keytab: Full path to the file that contains the keytab
+    :type keytab: str
+    :param master: spark://host:port, mesos://host:port, yarn, or local
+    :type master: str
+    :param name: Name of the job
+    :type name: str
+    :param num_executors: Number of executors to launch
+    :type num_executors: int
+    :param verbose: Whether to pass the verbose flag to spark-sql
+    :type verbose: bool
+    :param yarn_queue: The YARN queue to submit to (Default: "default")
+    :type yarn_queue: str
+    """
+    @apply_defaults
+    def __init__(self,
+                 sql,
+                 conf=None,
+                 conn_id='spark_sql_default',
+                 executor_cores=None,
+                 executor_memory=None,
+                 keytab=None,
+                 master='yarn',
+                 name='default-name',
+                 num_executors=None,
+                 yarn_queue='default',
+                 *args,
+                 **kwargs):
+        super(SparkSqlOperator, self).__init__(*args, **kwargs)
+        self._sql = sql
+        self._conf = conf
+        self._conn_id = conn_id
+        self._executor_cores = executor_cores
+        self._executor_memory = executor_memory
+        self._keytab = keytab
+        self._master = master
+        self._name = name
+        self._num_executors = num_executors
+        self._yarn_queue = yarn_queue
+        self._hook = None
+
+    def execute(self, context):
+        """
+        Call the SparkSqlHook to run the provided sql query
+        """
+        self._hook = SparkSqlHook(sql=self._sql,
+                                  conf=self._conf,
+                                  conn_id=self._conn_id,
+                                  executor_cores=self._executor_cores,
+                                  executor_memory=self._executor_memory,
+                                  keytab=self._keytab,
+                                  name=self._name,
+                                  num_executors=self._num_executors,
+                                  master=self._master,
+                                  yarn_queue=self._yarn_queue
+                                  )
+        self._hook.run_query()
+
+    def on_kill(self):
+        self._hook.kill()