You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/14 11:15:35 UTC
incubator-airflow git commit: [AIRFLOW-216] Add Sqoop Hook and
Operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 8d501b0ce -> 717a4aeee
[AIRFLOW-216] Add Sqoop Hook and Operator
This patch adds a Sqoop hook and operator that implements Sqoop import.
The hook is a wrapper around the sqoop 1 binary.
* Closes #1576 from jwi078/sqoop_operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/717a4aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/717a4aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/717a4aee
Branch: refs/heads/master
Commit: 717a4aeee3bd0883067c82a3174ad8e2b7dbe8ec
Parents: 8d501b0
Author: JohanW <jo...@gmail.com>
Authored: Tue Jun 14 13:14:55 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jun 14 13:14:59 2016 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/sqoop_hook.py | 223 +++++++++++++++++++++++
airflow/contrib/operators/sqoop_operator.py | 91 +++++++++
2 files changed, 314 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
new file mode 100644
index 0000000..f213fc4
--- /dev/null
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module contains a sqoop 1 hook
+"""
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+import logging
+import subprocess
+
+log = logging.getLogger(__name__)
+
+
+class SqoopHook(BaseHook):
+ """
+ This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook
+ it is required that "sqoop" is in the PATH.
+ :param hive_home: (from json) The location of hive-site.xml
+ :type hive_home: str
+ :param job_tracker: (from json) <local|jobtracker:port> specify a job tracker
+ :type job_tracker: str
+ :param namenode: (from json) specify a namenode
+ :type namenode: str
+ :param lib_jars: (from json) specify comma separated jar files to
+ include in the classpath.
+ :type lib_jars: str
+ :param files: (from json) specify comma separated files to be copied
+ to the map reduce cluster
+ :type files: (from json) str
+ :param archives: (from json) specify comma separated archives to be
+ unarchived on the compute machines.
+ :type archives: str
+ """
+ def __init__(self, conn_id='sqoop_default'):
+ conn = self.get_connection(conn_id)
+ self.hive_home = conn.extra_dejson.get('hive_home', None)
+ self.job_tracker = conn.extra_dejson.get('job_tracker', None)
+ self.namenode = conn.extra_dejson.get('namenode', None)
+ self.lib_jars = conn.extra_dejson.get('libjars', None)
+ self.files = conn.extra_dejson.get('files', None)
+ self.archives = conn.extra_dejson.get('archives', None)
+ self.conn = conn
+
+ def get_conn(self):
+ pass
+
+ def Popen(self, cmd, export=False, **kwargs):
+ """
+ Remote Popen
+
+ :param cmd: command to remotely execute
+ :param kwargs: extra arguments to Popen (see subprocess.Popen)
+ :return: handle to subprocess
+ """
+ prefixed_cmd = self._prepare_command(cmd, export=export)
+ print prefixed_cmd
+ return subprocess.Popen(prefixed_cmd, **kwargs)
+
+ def _prepare_command(self, cmd, export=False):
+
+ connection_cmd = ""
+
+ if export:
+ connection_cmd = ["sqoop", "export", "--verbose"]
+ else:
+ connection_cmd = ["sqoop", "import", "--verbose"]
+
+ if self.job_tracker:
+ connection_cmd += ["-jt", self.job_tracker]
+ if self.conn.login:
+ connection_cmd += ["--username", self.conn.login]
+ # todo: put this in a password file
+ if self.conn.password:
+ connection_cmd += ["--password", self.conn.password]
+ if self.lib_jars:
+ connection_cmd += ["-libjars", self.lib_jars]
+ if self.files:
+ connection_cmd += ["-files", self.files]
+ if self.namenode:
+ connection_cmd += ["-fs", self.namenode]
+ if self.archives:
+ connection_cmd += ["-archives", self.archives]
+
+ connection_cmd += ["--connect", "{}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema)]
+ connection_cmd += cmd
+
+ return connection_cmd
+
+ def _import_cmd(self, target_dir,
+ append=False, type="text",
+ num_mappers=None, split_by=None):
+
+ cmd = ["--target-dir", target_dir]
+
+ if not num_mappers:
+ num_mappers = 1
+
+ cmd += ["--num-mappers", str(num_mappers)]
+
+ if split_by:
+ cmd += ["--split-by", split_by]
+
+ if append:
+ cmd += ["--append"]
+
+ if type == "avro":
+ cmd += ["--as-avrodatafile"]
+ elif type == "sequence":
+ cmd += ["--as-sequencefile"]
+ else:
+ cmd += ["--as-textfile"]
+
+ return cmd
+
+ def import_table(self, table, target_dir,
+ append=False, type="text", columns=None,
+ num_mappers=None, split_by=None, where=None):
+ """
+ Imports table from remote location to target dir. Arguments are
+ copies of direct sqoop command line arguments
+ :param table: Table to read
+ :param target_dir: HDFS destination dir
+ :param append: Append data to an existing dataset in HDFS
+ :param type: "avro", "sequence", "text" Imports data to into the specified
+ format. Defaults to text.
+ :param columns: <col,col,col\u2026> Columns to import from table
+ :param num_mappers: Use n map tasks to import in parallel
+ :param split_by: Column of the table used to split work units
+ :param where: WHERE clause to use during import
+ """
+ cmd = self._import_cmd(target_dir, append, type,
+ num_mappers, split_by)
+ cmd += ["--table", table]
+ if columns:
+ cmd += ["--columns", columns]
+ if where:
+ cmd += ["--where", where]
+
+ p = self.Popen(cmd, export=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ output, stderr = p.communicate()
+
+ if p.returncode != 0:
+ # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output)
+ raise AirflowException("Cannot execute {} on {}. Error code is: "
+ "{}. Output: {}, Stderr: {}"
+ .format(cmd, self.conn.host,
+ p.returncode, output, stderr))
+
+ def _export_cmd(self, export_dir, num_mappers=None):
+
+ cmd = ["--export-dir", export_dir]
+
+ if not num_mappers:
+ num_mappers = 1
+
+ cmd += ["--num-mappers", str(num_mappers)]
+
+ return cmd
+
+ def export_table(self, table, export_dir,
+ num_mappers=None):
+ """
+ Exports Hive table to remote location. Arguments are copies of direct
+ sqoop command line Arguments
+ :param table: Table remote destination
+ :param export_dir: Hive table to export
+ :param num_mappers: Use n map tasks to import in parallel
+ """
+
+ cmd = self._export_cmd(export_dir, num_mappers)
+ cmd += ["--table", table]
+
+ p = self.Popen(cmd, export=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ output, stderr = p.communicate()
+
+ if p.returncode != 0:
+ # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output)
+ raise AirflowException("Cannot execute {} on {}. Error code is: "
+ "{}. Output: {}, Stderr: {}"
+ .format(cmd, self.conn.host,
+ p.returncode, output, stderr))
+
+ def import_query(self, query, target_dir,
+ append=False, type="text",
+ num_mappers=None, split_by=None):
+ """
+
+ :param query: Free format query to run
+ :param target_dir: HDFS destination dir
+ :param append: Append data to an existing dataset in HDFS
+ :param type: "avro", "sequence", "text" Imports data to into the specified
+ format. Defaults to text.
+ :param num_mappers: Use n map tasks to import in parallel
+ :param split_by: Column of the table used to split work units
+ """
+ cmd = self._import_cmd(target_dir, append, type,
+ num_mappers, split_by)
+ cmd += ["--query", query]
+
+ p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ output, stderr = p.communicate()
+
+ if p.returncode != 0:
+ # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output)
+ raise AirflowException("Cannot execute {} on {}. Error code is: "
+ "{}. Output: {}, Stderr: {}"
+ .format(cmd, self.conn.host,
+ p.returncode, output, stderr))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
new file mode 100644
index 0000000..8bf1c05
--- /dev/null
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module contains a sqoop 1 operator
+"""
+
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.hooks.sqoop_hook import SqoopHook
+
+
+class SqoopOperator(BaseOperator):
+ """
+ execute sqoop job
+ """
+ @apply_defaults
+ def __init__(self,
+ conn_id='sqoop_default',
+ type_cmd='import',
+ table='',
+ target_dir=None,
+ append=None,
+ type=None,
+ columns=None,
+ num_mappers='1',
+ split_by=None,
+ where=None,
+ export_dir=None,
+ *args,
+ **kwargs):
+ """
+ :param conn_id: str
+ :param type_cmd: str specify command to execute "export" or "import"
+ :param table: Table to read
+ :param target_dir: HDFS destination dir
+ :param append: Append data to an existing dataset in HDFS
+ :param type: "avro", "sequence", "text" Imports data to into the specified
+ format. Defaults to text.
+ :param columns: <col,col,col> Columns to import from table
+ :param num_mappers: U n map task to import/export in parallel
+ :param split_by: Column of the table used to split work units
+ :param where: WHERE clause to use during import
+ :param export_dir: HDFS Hive database directory to export
+ """
+ super(SqoopOperator, self).__init__(*args, **kwargs)
+ self.conn_id = conn_id
+ self.type_cmd = type_cmd
+ self.table = table
+ self.target_dir = target_dir
+ self.append = append
+ self.type = type
+ self.columns = columns
+ self.num_mappers = num_mappers
+ self.split_by = split_by
+ self.where = where
+ self.export_dir = export_dir
+
+ def execute(self, context):
+ """
+ Execute sqoop job
+ """
+ hook = SqoopHook(conn_id=self.conn_id)
+
+ if self.type_cmd is 'export':
+ hook.export_table(
+ table=self.table,
+ export_dir=self.export_dir,
+ num_mappers=self.num_mappers)
+ else:
+ hook.import_table(
+ table=self.table,
+ target_dir=self.target_dir,
+ append=self.append,
+ type=self.type,
+ columns=self.columns,
+ num_mappers=self.num_mappers,
+ split_by=self.split_by,
+ where=self.where)