You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/28 23:48:05 UTC

incubator-airflow git commit: [AIRFLOW-985] Extend the sqoop operator and hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4c0905068 -> 82eb20e9f


[AIRFLOW-985] Extend the sqoop operator and hook

The sqoop operator was a bit outdated and needed
some rework
including tests. Many lines have changed because
the code needed
some restructuring for better testing. Removed the
hive_home and
job_tracker because they are not used in any way
inside of the
sqoop class. Moved the num-mappers argument to the
constructor
because it is used for both importing and
exporting. Added
support for parquet. Added the ability to set the
driver and direct
mode and ability to pass jvm parameters to sqoop.

Closes #2177 from Fokko/airflow-985-extend-sqoop-
operator-hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/82eb20e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/82eb20e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/82eb20e9

Branch: refs/heads/master
Commit: 82eb20e9f525c09b7d8b4eea896dedcfb6b04f28
Parents: 4c09050
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Tue Mar 28 16:47:57 2017 -0700
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Mar 28 16:47:57 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py         | 302 ++++++++++++++---------
 airflow/contrib/operators/sqoop_operator.py | 135 ++++++++--
 airflow/utils/db.py                         |   4 +
 tests/contrib/hooks/sqoop_hook.py           | 219 ++++++++++++++++
 tests/contrib/operators/sqoop_operator.py   |  93 +++++++
 5 files changed, 613 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/82eb20e9/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index ef25b7d..e1f4779 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -14,15 +14,15 @@
 #
 
 """
-This module contains a sqoop 1 hook
+This module contains a sqoop 1.x hook
 """
 
-from airflow.hooks.base_hook import BaseHook
-from airflow.exceptions import AirflowException
-
 import logging
 import subprocess
 
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
 log = logging.getLogger(__name__)
 
 
@@ -30,36 +30,42 @@ class SqoopHook(BaseHook):
     """
     This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook
     it is required that "sqoop" is in the PATH.
-    :param hive_home: (from json) The location of hive-site.xml
-    :type hive_home: str
-    :param job_tracker: (from json) <local|jobtracker:port> specify a job tracker
+    :param job_tracker: (from json) specify a job tracker local|jobtracker:port
     :type job_tracker: str
     :param namenode: (from json) specify a namenode
     :type namenode: str
-    :param lib_jars: (from json) specify comma separated jar files to
-        include in the classpath.
+    :param lib_jars: (from json) specify comma separated jar
+        files to include in the classpath.
     :type lib_jars: str
-    :param files: (from json) specify comma separated files to be copied
-        to the map reduce cluster
+    :param files: (from json) specify comma separated files to be copied to
+        the map reduce cluster
     :type files: (from json) str
     :param archives: (from json)  specify comma separated archives to be
         unarchived on the compute machines.
     :type archives: str
     """
-    def __init__(self, conn_id='sqoop_default'):
-        conn = self.get_connection(conn_id)
-        self.hive_home = conn.extra_dejson.get('hive_home', None)
-        self.job_tracker = conn.extra_dejson.get('job_tracker', None)
-        self.namenode = conn.extra_dejson.get('namenode', None)
-        self.lib_jars = conn.extra_dejson.get('libjars', None)
-        self.files = conn.extra_dejson.get('files', None)
-        self.archives = conn.extra_dejson.get('archives', None)
-        self.conn = conn
+
+    def __init__(self, conn_id='sqoop_default', verbose=False,
+                 num_mappers=None, properties=None):
+        # No mutable types in the default parameters
+        if properties is None:
+            properties = {}
+        self.conn = self.get_connection(conn_id)
+        connection_parameters = self.conn.extra_dejson
+        self.job_tracker = connection_parameters.get('job_tracker', None)
+        self.namenode = connection_parameters.get('namenode', None)
+        self.libjars = connection_parameters.get('libjars', None)
+        self.files = connection_parameters.get('files', None)
+        self.archives = connection_parameters.get('archives', None)
+        self.password_file = connection_parameters.get('password_file', None)
+        self.verbose = verbose
+        self.num_mappers = str(num_mappers)
+        self.properties = properties
 
     def get_conn(self):
         pass
 
-    def Popen(self, cmd, export=False, **kwargs):  # noqa
+    def Popen(self, cmd, **kwargs):
         """
         Remote Popen
 
@@ -67,157 +73,223 @@ class SqoopHook(BaseHook):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        prefixed_cmd = self._prepare_command(cmd, export=export)
-        return subprocess.Popen(prefixed_cmd, **kwargs)
-
-    def _prepare_command(self, cmd, export=False):
+        process = subprocess.Popen(cmd,
+                                   stdout=subprocess.PIPE,
+                                   stderr=subprocess.PIPE,
+                                   **kwargs)
+        output, stderr = process.communicate()
+
+        if process.returncode != 0:
+            raise AirflowException((
+                                       "Cannot execute {} on {}. Error code is: {}"
+                                       "Output: {}, Stderr: {}"
+                                   ).format(cmd, self.conn.host,
+                                            process.returncode, output,
+                                            stderr))
+
+    def _prepare_command(self, export=False):
         if export:
-            connection_cmd = ["sqoop", "export", "--verbose"]
+            connection_cmd = ["sqoop", "export"]
         else:
-            connection_cmd = ["sqoop", "import", "--verbose"]
+            connection_cmd = ["sqoop", "import"]
 
+        if self.verbose:
+            connection_cmd += ["--verbose"]
         if self.job_tracker:
             connection_cmd += ["-jt", self.job_tracker]
         if self.conn.login:
             connection_cmd += ["--username", self.conn.login]
-        # todo: put this in a password file
         if self.conn.password:
             connection_cmd += ["--password", self.conn.password]
-        if self.lib_jars:
-            connection_cmd += ["-libjars", self.lib_jars]
+        if self.password_file:
+            connection_cmd += ["--password-file", self.password_file]
+        if self.libjars:
+            connection_cmd += ["-libjars", self.libjars]
         if self.files:
             connection_cmd += ["-files", self.files]
         if self.namenode:
             connection_cmd += ["-fs", self.namenode]
         if self.archives:
             connection_cmd += ["-archives", self.archives]
+        if self.num_mappers:
+            connection_cmd += ["--num-mappers", self.num_mappers]
+
+        for key, value in self.properties.items():
+            connection_cmd += ["-D", "{}={}".format(key, value)]
 
-        connection_cmd += ["--connect", "{}:{}/{}".format(self.conn.host, self.conn.port,
-                                                          self.conn.schema)]
-        connection_cmd += cmd
+        connection_cmd += ["--connect", "{}:{}/{}".format(
+            self.conn.host,
+            self.conn.port,
+            self.conn.schema
+        )]
 
         return connection_cmd
 
-    def _import_cmd(self, target_dir,
-                    append=False, type="text",
-                    num_mappers=None, split_by=None):
+    def _get_export_format_argument(self, file_type='text'):
+        if file_type == "avro":
+            return ["--as-avrodatafile"]
+        elif file_type == "sequence":
+            return ["--as-sequencefile"]
+        elif file_type == "parquet":
+            return ["--as-parquetfile"]
+        else:
+            return ["--as-textfile"]
+
+    def _import_cmd(self, target_dir, append, file_type, split_by, direct,
+                    driver):
 
-        cmd = ["--target-dir", target_dir]
+        cmd = self._prepare_command(export=False)
 
-        if not num_mappers:
-            num_mappers = 1
+        cmd += ["--target-dir", target_dir]
+
+        if append:
+            cmd += ["--append"]
 
-        cmd += ["--num-mappers", str(num_mappers)]
+        cmd += self._get_export_format_argument(file_type)
 
         if split_by:
             cmd += ["--split-by", split_by]
 
-        if append:
-            cmd += ["--append"]
+        if direct:
+            cmd += ["--direct"]
 
-        if type == "avro":
-            cmd += ["--as-avrodatafile"]
-        elif type == "sequence":
-            cmd += ["--as-sequencefile"]
-        else:
-            cmd += ["--as-textfile"]
+        if driver:
+            cmd += ["--driver", driver]
 
         return cmd
 
-    def import_table(self, table, target_dir,
-                     append=False, type="text", columns=None,
-                     num_mappers=None, split_by=None, where=None):
+    def import_table(self, table, target_dir, append=False, file_type="text",
+                     columns=None, split_by=None, where=None, direct=False,
+                     driver=None):
         """
         Imports table from remote location to target dir. Arguments are
         copies of direct sqoop command line arguments
         :param table: Table to read
         :param target_dir: HDFS destination dir
         :param append: Append data to an existing dataset in HDFS
-        :param type: "avro", "sequence", "text" Imports data to into the specified
-            format. Defaults to text.
+        :param file_type: "avro", "sequence", "text" or "parquet".
+            Imports data to into the specified format. Defaults to text.
         :param columns: <col,col,col\u2026> Columns to import from table
-        :param num_mappers: Use n map tasks to import in parallel
         :param split_by: Column of the table used to split work units
         :param where: WHERE clause to use during import
+        :param direct: Use direct connector if exists for the database
+        :param driver: Manually specify JDBC driver class to use
         """
-        cmd = self._import_cmd(target_dir, append, type,
-                               num_mappers, split_by)
+        cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
+                               driver)
+
         cmd += ["--table", table]
+
         if columns:
             cmd += ["--columns", columns]
         if where:
             cmd += ["--where", where]
 
-        p = self.Popen(cmd, export=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-        output, stderr = p.communicate()
+        self.Popen(cmd)
 
-        if p.returncode != 0:
-            # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host,
-            # output=output)
-            raise AirflowException("Cannot execute {} on {}. Error code is: "
-                                   "{}. Output: {}, Stderr: {}"
-                                   .format(cmd, self.conn.host,
-                                           p.returncode, output, stderr))
+    def import_query(self, query, target_dir,
+                     append=False, file_type="text",
+                     split_by=None, direct=None, driver=None):
+        """
+        Imports a specific query from the rdbms to hdfs
+        :param query: Free format query to run
+        :param target_dir: HDFS destination dir
+        :param append: Append data to an existing dataset in HDFS
+        :param file_type: "avro", "sequence", "text" or "parquet"
+            Imports data to hdfs into the specified format. Defaults to text.
+        :param split_by: Column of the table used to split work units
+        :param direct: Use direct import fast path
+        :param driver: Manually specify JDBC driver class to use
+        """
+        cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
+                               driver)
+        cmd += ["--query", query]
 
-    def _export_cmd(self, export_dir, num_mappers=None):
+        self.Popen(cmd)
 
-        cmd = ["--export-dir", export_dir]
+    def _export_cmd(self, table, export_dir, input_null_string,
+                    input_null_non_string, staging_table, clear_staging_table,
+                    enclosed_by, escaped_by, input_fields_terminated_by,
+                    input_lines_terminated_by, input_optionally_enclosed_by,
+                    batch, relaxed_isolation):
 
-        if not num_mappers:
-            num_mappers = 1
+        cmd = self._prepare_command(export=True)
 
-        cmd += ["--num-mappers", str(num_mappers)]
+        if input_null_string:
+            cmd += ["--input-null-string", input_null_string]
 
-        return cmd
+        if input_null_non_string:
+            cmd += ["--input-null-non-string", input_null_non_string]
 
-    def export_table(self, table, export_dir,
-                     num_mappers=None):
-        """
-        Exports Hive table to remote location. Arguments are copies of direct
-        sqoop command line Arguments
-        :param table: Table remote destination
-        :param export_dir: Hive table to export
-        :param num_mappers: Use n map tasks to import in parallel
-        """
+        if staging_table:
+            cmd += ["--staging-table", staging_table]
 
-        cmd = self._export_cmd(export_dir, num_mappers)
-        cmd += ["--table", table]
+        if clear_staging_table:
+            cmd += ["--clear-staging-table"]
 
-        p = self.Popen(cmd, export=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-        output, stderr = p.communicate()
+        if enclosed_by:
+            cmd += ["--enclosed-by", enclosed_by]
 
-        if p.returncode != 0:
-            # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host,
-            # output=output)
-            raise AirflowException("Cannot execute {} on {}. Error code is: "
-                                   "{}. Output: {}, Stderr: {}"
-                                   .format(cmd, self.conn.host,
-                                           p.returncode, output, stderr))
+        if escaped_by:
+            cmd += ["--escaped-by", escaped_by]
 
-    def import_query(self, query, target_dir,
-                     append=False, type="text",
-                     num_mappers=None, split_by=None):
-        """
+        if input_fields_terminated_by:
+            cmd += ["--input-fields-terminated-by", input_fields_terminated_by]
 
-        :param query: Free format query to run
-        :param target_dir: HDFS destination dir
-        :param append: Append data to an existing dataset in HDFS
-        :param type: "avro", "sequence", "text" Imports data to into the specified
-            format. Defaults to text.
-        :param num_mappers: Use n map tasks to import in parallel
-        :param split_by: Column of the table used to split work units
-        """
-        cmd = self._import_cmd(target_dir, append, type,
-                               num_mappers, split_by)
-        cmd += ["--query", query]
+        if input_lines_terminated_by:
+            cmd += ["--input-lines-terminated-by", input_lines_terminated_by]
+
+        if input_optionally_enclosed_by:
+            cmd += ["--input-optionally-enclosed-by",
+                    input_optionally_enclosed_by]
+
+        if batch:
+            cmd += ["--batch"]
+
+        if relaxed_isolation:
+            cmd += ["--relaxed-isolation"]
 
-        p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-        output, stderr = p.communicate()
+        # The required options
+        cmd += ["--export-dir", export_dir]
+        cmd += ["--table", table]
+
+        return cmd
 
-        if p.returncode != 0:
-            # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host,
-            # output=output)
-            raise AirflowException("Cannot execute {} on {}. Error code is: "
-                                   "{}. Output: {}, Stderr: {}"
-                                   .format(cmd, self.conn.host,
-                                           p.returncode, output, stderr))
+    def export_table(self, table, export_dir, input_null_string,
+                     input_null_non_string, staging_table,
+                     clear_staging_table, enclosed_by,
+                     escaped_by, input_fields_terminated_by,
+                     input_lines_terminated_by,
+                     input_optionally_enclosed_by, batch,
+                     relaxed_isolation):
+        """
+        Exports Hive table to remote location. Arguments are copies of direct
+        sqoop command line Arguments
+        :param table: Table remote destination
+        :param export_dir: Hive table to export
+        :param input_null_string: The string to be interpreted as null for
+            string columns
+        :param input_null_non_string: The string to be interpreted as null
+            for non-string columns
+        :param staging_table: The table in which data will be staged before
+            being inserted into the destination table
+        :param clear_staging_table: Indicate that any data present in the
+            staging table can be deleted
+        :param enclosed_by: Sets a required field enclosing character
+        :param escaped_by: Sets the escape character
+        :param input_fields_terminated_by: Sets the field separator character
+        :param input_lines_terminated_by: Sets the end-of-line character
+        :param input_optionally_enclosed_by: Sets a field enclosing character
+        :param batch: Use batch mode for underlying statement execution
+        :param relaxed_isolation: Transaction isolation to read uncommitted
+            for the mappers
+        """
+        cmd = self._export_cmd(table, export_dir, input_null_string,
+                               input_null_non_string, staging_table,
+                               clear_staging_table, enclosed_by, escaped_by,
+                               input_fields_terminated_by,
+                               input_lines_terminated_by,
+                               input_optionally_enclosed_by, batch,
+                               relaxed_isolation)
+
+        self.Popen(cmd)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/82eb20e9/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index d483e37..3dd9403 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -17,75 +17,160 @@
 This module contains a sqoop 1 operator
 """
 
+from airflow.contrib.hooks.sqoop_hook import SqoopHook
+from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.contrib.hooks.sqoop_hook import SqoopHook
 
 
 class SqoopOperator(BaseOperator):
     """
     execute sqoop job
     """
+
     @apply_defaults
     def __init__(self,
                  conn_id='sqoop_default',
-                 type_cmd='import',
-                 table='',
+                 cmd_type='import',
+                 table=None,
+                 query=None,
                  target_dir=None,
                  append=None,
-                 type=None,
+                 file_type=None,
                  columns=None,
-                 num_mappers='1',
+                 num_mappers=None,
                  split_by=None,
                  where=None,
                  export_dir=None,
+                 input_null_string=None,
+                 input_null_non_string=None,
+                 staging_table=None,
+                 clear_staging_table=False,
+                 enclosed_by=None,
+                 escaped_by=None,
+                 input_fields_terminated_by=None,
+                 input_lines_terminated_by=None,
+                 input_optionally_enclosed_by=None,
+                 batch=False,
+                 direct=False,
+                 driver=None,
+                 verbose=False,
+                 relaxed_isolation=False,
+                 properties=None,
                  *args,
                  **kwargs):
         """
         :param conn_id: str
-        :param type_cmd: str specify command to execute "export" or "import"
+        :param cmd_type: str specify command to execute "export" or "import"
         :param table: Table to read
-        :param target_dir: HDFS destination dir
+        :param target_dir: HDFS destination directory where the data
+            from the rdbms will be written
         :param append: Append data to an existing dataset in HDFS
-        :param type: "avro", "sequence", "text" Imports data to into the specified
-           format. Defaults to text.
+        :param file_type: "avro", "sequence", "text" Imports data to
+            into the specified format. Defaults to text.
         :param columns: <col,col,col> Columns to import from table
-        :param num_mappers: U n map task to import/export in parallel
+        :param num_mappers: Use n mapper tasks to import/export in parallel
         :param split_by: Column of the table used to split work units
         :param where: WHERE clause to use during import
-        :param export_dir: HDFS Hive database directory to export
+        :param export_dir: HDFS Hive database directory to export to the rdbms
+        :param input_null_string: The string to be interpreted as null
+            for string columns
+        :param input_null_non_string: The string to be interpreted as null
+            for non-string columns
+        :param staging_table: The table in which data will be staged before
+            being inserted into the destination table
+        :param clear_staging_table: Indicate that any data present in the
+            staging table can be deleted
+        :param enclosed_by: Sets a required field enclosing character
+        :param escaped_by: Sets the escape character
+        :param input_fields_terminated_by: Sets the input field separator
+        :param input_lines_terminated_by: Sets the input end-of-line character
+        :param input_optionally_enclosed_by: Sets a field enclosing character
+        :param batch: Use batch mode for underlying statement execution
+        :param direct: Use direct export fast path
+        :param driver: Manually specify JDBC driver class to use
+        :param verbose: Switch to more verbose logging for debug purposes
+        :param relaxed_isolation: use read uncommitted isolation level
+        :param properties: additional JVM properties passed to sqoop
         """
         super(SqoopOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
-        self.type_cmd = type_cmd
+        self.cmd_type = cmd_type
         self.table = table
+        self.query = query
         self.target_dir = target_dir
         self.append = append
-        self.type = type
+        self.file_type = file_type
         self.columns = columns
         self.num_mappers = num_mappers
         self.split_by = split_by
         self.where = where
         self.export_dir = export_dir
+        self.input_null_string = input_null_string
+        self.input_null_non_string = input_null_non_string
+        self.staging_table = staging_table
+        self.clear_staging_table = clear_staging_table
+        self.enclosed_by = enclosed_by
+        self.escaped_by = escaped_by
+        self.input_fields_terminated_by = input_fields_terminated_by
+        self.input_lines_terminated_by = input_lines_terminated_by
+        self.input_optionally_enclosed_by = input_optionally_enclosed_by
+        self.batch = batch
+        self.direct = direct
+        self.driver = driver
+        self.verbose = verbose
+        self.relaxed_isolation = relaxed_isolation
+        # No mutable types in the default parameters
+        if properties is None:
+            properties = {}
+        self.properties = properties
 
     def execute(self, context):
         """
         Execute sqoop job
         """
-        hook = SqoopHook(conn_id=self.conn_id)
+        hook = SqoopHook(conn_id=self.conn_id,
+                         verbose=self.verbose,
+                         num_mappers=self.num_mappers,
+                         properties=self.properties)
 
-        if self.type_cmd is 'export':
+        if self.cmd_type is 'export':
             hook.export_table(
                 table=self.table,
                 export_dir=self.export_dir,
-                num_mappers=self.num_mappers)
+                input_null_string=self.input_null_string,
+                input_null_non_string=self.input_null_non_string,
+                staging_table=self.staging_table,
+                clear_staging_table=self.clear_staging_table,
+                enclosed_by=self.enclosed_by,
+                escaped_by=self.escaped_by,
+                input_fields_terminated_by=self.input_fields_terminated_by,
+                input_lines_terminated_by=self.input_lines_terminated_by,
+                input_optionally_enclosed_by=self.input_optionally_enclosed_by,
+                batch=self.batch,
+                relaxed_isolation=self.relaxed_isolation)
         else:
-            hook.import_table(
-                table=self.table,
-                target_dir=self.target_dir,
-                append=self.append,
-                type=self.type,
-                columns=self.columns,
-                num_mappers=self.num_mappers,
-                split_by=self.split_by,
-                where=self.where)
+            if not self.table:
+                hook.import_table(
+                    table=self.table,
+                    target_dir=self.target_dir,
+                    append=self.append,
+                    file_type=self.file_type,
+                    columns=self.columns,
+                    split_by=self.split_by,
+                    where=self.where,
+                    direct=self.direct,
+                    driver=self.driver)
+            elif not self.query:
+                hook.import_query(
+                    query=self.table,
+                    target_dir=self.target_dir,
+                    append=self.append,
+                    file_type=self.file_type,
+                    split_by=self.split_by,
+                    direct=self.direct,
+                    driver=self.driver)
+            else:
+                raise AirflowException(
+                    "Provide query or table parameter to import using Sqoop"
+                )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/82eb20e9/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 618e0020..a619a41 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -196,6 +196,10 @@ def initdb():
             extra='{"db": 0}'))
     merge_conn(
         models.Connection(
+            conn_id='sqoop_default', conn_type='sqoop',
+            host='rmdbs', extra=''))
+    merge_conn(
+        models.Connection(
             conn_id='emr_default', conn_type='emr',
             extra='''
                 {   "Name": "default_job_flow_name",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/82eb20e9/tests/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/sqoop_hook.py b/tests/contrib/hooks/sqoop_hook.py
new file mode 100644
index 0000000..1d85e43
--- /dev/null
+++ b/tests/contrib/hooks/sqoop_hook.py
@@ -0,0 +1,219 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import unittest
+from exceptions import OSError
+
+from airflow import configuration, models
+from airflow.contrib.hooks.sqoop_hook import SqoopHook
+from airflow.utils import db
+
+
+class TestSqoopHook(unittest.TestCase):
+    _config = {
+        'conn_id': 'sqoop_test',
+        'num_mappers': 22,
+        'verbose': True,
+        'properties': {
+            'mapred.map.max.attempts': '1'
+        }
+    }
+    _config_export = {
+        'table': 'domino.export_data_to',
+        'export_dir': '/hdfs/data/to/be/exported',
+        'input_null_string': '\n',
+        'input_null_non_string': '\t',
+        'staging_table': 'database.staging',
+        'clear_staging_table': True,
+        'enclosed_by': '"',
+        'escaped_by': '\\',
+        'input_fields_terminated_by': '|',
+        'input_lines_terminated_by': '\n',
+        'input_optionally_enclosed_by': '"',
+        'batch': True,
+        'relaxed_isolation': True
+    }
+    _config_import = {
+        'target_dir': '/hdfs/data/target/location',
+        'append': True,
+        'file_type': 'parquet',
+        'split_by': '\n',
+        'direct': True,
+        'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver'
+    }
+
+    _config_json = {
+        'namenode': 'http://0.0.0.0:50070/',
+        'job_tracker': 'http://0.0.0.0:50030/',
+        'libjars': '/path/to/jars',
+        'files': '/path/to/files',
+        'archives': '/path/to/archives'
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='sqoop_test', conn_type='sqoop',
+                host='rmdbs', port=5050, extra=json.dumps(self._config_json)
+            )
+        )
+
+    def test_popen(self):
+        hook = SqoopHook(**self._config)
+
+        # Should go well
+        hook.Popen(['ls'])
+
+        # Should give an exception
+        with self.assertRaises(OSError):
+            hook.Popen('exit 1')
+
+    def test_submit(self):
+        hook = SqoopHook(**self._config)
+
+        cmd = ' '.join(hook._prepare_command())
+
+        # Check if the config has been extracted from the json
+        if self._config_json['namenode']:
+            assert "-fs {}".format(self._config_json['namenode']) in cmd
+
+        if self._config_json['job_tracker']:
+            assert "-jt {}".format(self._config_json['job_tracker']) in cmd
+
+        if self._config_json['libjars']:
+            assert "-libjars {}".format(self._config_json['libjars']) in cmd
+
+        if self._config_json['files']:
+            assert "-files {}".format(self._config_json['files']) in cmd
+
+        if self._config_json['archives']:
+            assert "-archives {}".format(self._config_json['archives']) in cmd
+
+        # Check the regulator stuff passed by the default constructor
+        if self._config['verbose']:
+            assert "--verbose" in cmd
+
+        if self._config['num_mappers']:
+            assert "--num-mappers {}".format(
+                self._config['num_mappers']) in cmd
+
+        print(self._config['properties'])
+        for key, value in self._config['properties'].items():
+            assert "-D {}={}".format(key, value) in cmd
+
+        # We don't have the sqoop binary available, and this is hard to mock,
+        # so just accept an exception for now.
+        with self.assertRaises(OSError):
+            hook.export_table(**self._config_export)
+
+        with self.assertRaises(OSError):
+            hook.import_table(table='schema.table',
+                              target_dir='/sqoop/example/path')
+
+        with self.assertRaises(OSError):
+            hook.import_query(query='SELECT * FROM sometable',
+                              target_dir='/sqoop/example/path')
+
+    def test_export_cmd(self):
+        hook = SqoopHook()
+
+        # The subprocess requires an array but we build the cmd by joining on a space
+        cmd = ' '.join(
+            hook._export_cmd(
+                self._config_export['table'],
+                self._config_export['export_dir'],
+                input_null_string=self._config_export['input_null_string'],
+                input_null_non_string=self._config_export[
+                    'input_null_non_string'],
+                staging_table=self._config_export['staging_table'],
+                clear_staging_table=self._config_export['clear_staging_table'],
+                enclosed_by=self._config_export['enclosed_by'],
+                escaped_by=self._config_export['escaped_by'],
+                input_fields_terminated_by=self._config_export[
+                    'input_fields_terminated_by'],
+                input_lines_terminated_by=self._config_export[
+                    'input_lines_terminated_by'],
+                input_optionally_enclosed_by=self._config_export[
+                    'input_optionally_enclosed_by'],
+                batch=self._config_export['batch'],
+                relaxed_isolation=self._config_export['relaxed_isolation'])
+        )
+
+        assert "--input-null-string {}".format(
+            self._config_export['input_null_string']) in cmd
+        assert "--input-null-non-string {}".format(
+            self._config_export['input_null_non_string']) in cmd
+        assert "--staging-table {}".format(
+            self._config_export['staging_table']) in cmd
+        assert "--enclosed-by {}".format(
+            self._config_export['enclosed_by']) in cmd
+        assert "--escaped-by {}".format(
+            self._config_export['escaped_by']) in cmd
+        assert "--input-fields-terminated-by {}".format(
+            self._config_export['input_fields_terminated_by']) in cmd
+        assert "--input-lines-terminated-by {}".format(
+            self._config_export['input_lines_terminated_by']) in cmd
+        assert "--input-optionally-enclosed-by {}".format(
+            self._config_export['input_optionally_enclosed_by']) in cmd
+
+        if self._config_export['clear_staging_table']:
+            assert "--clear-staging-table" in cmd
+
+        if self._config_export['batch']:
+            assert "--batch" in cmd
+
+        if self._config_export['relaxed_isolation']:
+            assert "--relaxed-isolation" in cmd
+
+    def test_import_cmd(self):
+        hook = SqoopHook()
+
+        # The subprocess requires an array but we build the cmd by joining on a space
+        cmd = ' '.join(
+            hook._import_cmd(self._config_import['target_dir'],
+                             append=self._config_import['append'],
+                             file_type=self._config_import['file_type'],
+                             split_by=self._config_import['split_by'],
+                             direct=self._config_import['direct'],
+                             driver=self._config_import['driver'])
+        )
+
+        if self._config_import['append']:
+            assert '--append' in cmd
+
+        if self._config_import['direct']:
+            assert '--direct' in cmd
+
+        assert '--target-dir {}'.format(
+            self._config_import['target_dir']) in cmd
+
+        assert '--driver {}'.format(self._config_import['driver']) in cmd
+        assert '--split-by {}'.format(self._config_import['split_by']) in cmd
+
+    def test_get_export_format_argument(self):
+        hook = SqoopHook()
+        assert "--as-avrodatafile" in hook._get_export_format_argument('avro')
+        assert "--as-parquetfile" in hook._get_export_format_argument(
+            'parquet')
+        assert "--as-sequencefile" in hook._get_export_format_argument(
+            'sequence')
+        assert "--as-textfile" in hook._get_export_format_argument('text')
+        assert "--as-textfile" in hook._get_export_format_argument('unknown')
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/82eb20e9/tests/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/sqoop_operator.py b/tests/contrib/operators/sqoop_operator.py
new file mode 100644
index 0000000..a46dc93
--- /dev/null
+++ b/tests/contrib/operators/sqoop_operator.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.sqoop_operator import SqoopOperator
+
+
+class TestSqoopOperator(unittest.TestCase):
+    _config = {
+        'cmd_type': 'export',
+        'table': 'target_table',
+        'query': 'SELECT * FROM schema.table',
+        'target_dir': '/path/on/hdfs/to/import',
+        'append': True,
+        'file_type': 'avro',
+        'columns': 'a,b,c',
+        'num_mappers': 22,
+        'split_by': 'id',
+        'export_dir': '/path/on/hdfs/to/export',
+        'input_null_string': '\n',
+        'input_null_non_string': '\t',
+        'staging_table': 'target_table_staging',
+        'clear_staging_table': True,
+        'enclosed_by': '"',
+        'escaped_by': '\\',
+        'input_fields_terminated_by': '|',
+        'input_lines_terminated_by': '\n',
+        'input_optionally_enclosed_by': '"',
+        'batch': True,
+        'relaxed_isolation': True,
+        'direct': True,
+        'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+        'properties': {
+            'mapred.map.max.attempts': '1'
+        }
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_execute(self, conn_id='sqoop_default'):
+        operator = SqoopOperator(
+            task_id='sqoop_job',
+            dag=self.dag,
+            **self._config
+        )
+
+        self.assertEqual(conn_id, operator.conn_id)
+
+        self.assertEqual(self._config['cmd_type'], operator.cmd_type)
+        self.assertEqual(self._config['table'], operator.table)
+        self.assertEqual(self._config['target_dir'], operator.target_dir)
+        self.assertEqual(self._config['append'], operator.append)
+        self.assertEqual(self._config['file_type'], operator.file_type)
+        self.assertEqual(self._config['num_mappers'], operator.num_mappers)
+        self.assertEqual(self._config['split_by'], operator.split_by)
+        self.assertEqual(self._config['input_null_string'],
+                         operator.input_null_string)
+        self.assertEqual(self._config['input_null_non_string'],
+                         operator.input_null_non_string)
+        self.assertEqual(self._config['staging_table'], operator.staging_table)
+        self.assertEqual(self._config['clear_staging_table'],
+                         operator.clear_staging_table)
+        self.assertEqual(self._config['batch'], operator.batch)
+        self.assertEqual(self._config['relaxed_isolation'],
+                         operator.relaxed_isolation)
+        self.assertEqual(self._config['direct'], operator.direct)
+        self.assertEqual(self._config['driver'], operator.driver)
+        self.assertEqual(self._config['properties'], operator.properties)
+
+
+if __name__ == '__main__':
+    unittest.main()