You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/28 13:08:27 UTC
incubator-airflow git commit: [AIRFLOW-1734][Airflow 1734] Sqoop
hook/operator enhancements
Repository: incubator-airflow
Updated Branches:
refs/heads/master efdc4d3b4 -> 1d531555e
[AIRFLOW-1734][Airflow 1734] Sqoop hook/operator enhancements
Closes #2703 from Acehaidrey/sqoop_contrib_fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d531555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d531555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d531555
Branch: refs/heads/master
Commit: 1d531555ecd594ee7ec2c5d3fc87f8d4bcc2c27e
Parents: efdc4d3
Author: Ace Haidrey <ah...@pandora.com>
Authored: Sat Oct 28 15:07:56 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Oct 28 15:07:56 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/sqoop_hook.py | 91 ++++++++------
airflow/contrib/operators/sqoop_operator.py | 53 +++++++--
tests/contrib/hooks/test_sqoop_hook.py | 113 +++++++++++++-----
tests/contrib/operators/test_sqoop_operator.py | 124 ++++++++++++++++++--
4 files changed, 293 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 5b00b15..c56fbcb 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -43,7 +43,7 @@ class SqoopHook(BaseHook, LoggingMixin):
:param verbose: Set sqoop to verbose.
:type verbose: bool
:param num_mappers: Number of map tasks to import in parallel.
- :type num_mappers: str
+ :type num_mappers: int
:param properties: Properties to set via the -D argument
:type properties: dict
"""
@@ -52,8 +52,6 @@ class SqoopHook(BaseHook, LoggingMixin):
num_mappers=None, hcatalog_database=None,
hcatalog_table=None, properties=None):
# No mutable types in the default parameters
- if properties is None:
- properties = {}
self.conn = self.get_connection(conn_id)
connection_parameters = self.conn.extra_dejson
self.job_tracker = connection_parameters.get('job_tracker', None)
@@ -66,10 +64,11 @@ class SqoopHook(BaseHook, LoggingMixin):
self.hcatalog_table = hcatalog_table
self.verbose = verbose
self.num_mappers = num_mappers
- self.properties = properties
+ self.properties = properties or {}
+ self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema))
def get_conn(self):
- pass
+ return self.conn
def cmd_mask_password(self, cmd):
try:
@@ -87,7 +86,7 @@ class SqoopHook(BaseHook, LoggingMixin):
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
"""
- self.log.info("Executing command: %s", ' '.join(cmd))
+ self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd))))
sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
@@ -101,32 +100,33 @@ class SqoopHook(BaseHook, LoggingMixin):
self.log.info("Command exited with return code %s", sp.returncode)
if sp.returncode:
- raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
+ raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd))))
def _prepare_command(self, export=False):
- if export:
- connection_cmd = ["sqoop", "export"]
- else:
- connection_cmd = ["sqoop", "import"]
+ sqoop_cmd_type = "export" if export else "import"
+ connection_cmd = ["sqoop", sqoop_cmd_type]
- if self.verbose:
- connection_cmd += ["--verbose"]
+ for key, value in self.properties.items():
+ connection_cmd += ["-D", "{}={}".format(key, value)]
+
+ if self.namenode:
+ connection_cmd += ["-fs", self.namenode]
if self.job_tracker:
connection_cmd += ["-jt", self.job_tracker]
- if self.conn.login:
- connection_cmd += ["--username", self.conn.login]
- if self.conn.password:
- connection_cmd += ["--password", self.conn.password]
- if self.password_file:
- connection_cmd += ["--password-file", self.password_file]
if self.libjars:
connection_cmd += ["-libjars", self.libjars]
if self.files:
connection_cmd += ["-files", self.files]
- if self.namenode:
- connection_cmd += ["-fs", self.namenode]
if self.archives:
connection_cmd += ["-archives", self.archives]
+ if self.conn.login:
+ connection_cmd += ["--username", self.conn.login]
+ if self.conn.password:
+ connection_cmd += ["--password", self.conn.password]
+ if self.password_file:
+ connection_cmd += ["--password-file", self.password_file]
+ if self.verbose:
+ connection_cmd += ["--verbose"]
if self.num_mappers:
connection_cmd += ["--num-mappers", str(self.num_mappers)]
if self.hcatalog_database:
@@ -134,9 +134,6 @@ class SqoopHook(BaseHook, LoggingMixin):
if self.hcatalog_table:
connection_cmd += ["--hcatalog-table", self.hcatalog_table]
- for key, value in self.properties.items():
- connection_cmd += ["-D", "{}={}".format(key, value)]
-
connection_cmd += ["--connect", "{}:{}/{}".format(
self.conn.host,
self.conn.port,
@@ -145,7 +142,8 @@ class SqoopHook(BaseHook, LoggingMixin):
return connection_cmd
- def _get_export_format_argument(self, file_type='text'):
+ @staticmethod
+ def _get_export_format_argument(file_type='text'):
if file_type == "avro":
return ["--as-avrodatafile"]
elif file_type == "sequence":
@@ -159,11 +157,12 @@ class SqoopHook(BaseHook, LoggingMixin):
"'sequence', 'parquet' or 'text'.")
def _import_cmd(self, target_dir, append, file_type, split_by, direct,
- driver):
+ driver, extra_import_options):
cmd = self._prepare_command(export=False)
- cmd += ["--target-dir", target_dir]
+ if target_dir:
+ cmd += ["--target-dir", target_dir]
if append:
cmd += ["--append"]
@@ -179,11 +178,16 @@ class SqoopHook(BaseHook, LoggingMixin):
if driver:
cmd += ["--driver", driver]
+ if extra_import_options:
+ for key, value in extra_import_options.items():
+ cmd += ['--{}'.format(key)]
+ if value: cmd += [value]
+
return cmd
- def import_table(self, table, target_dir, append=False, file_type="text",
+ def import_table(self, table, target_dir=None, append=False, file_type="text",
columns=None, split_by=None, where=None, direct=False,
- driver=None):
+ driver=None, extra_import_options=None):
"""
Imports table from remote location to target dir. Arguments are
copies of direct sqoop command line arguments
@@ -197,9 +201,12 @@ class SqoopHook(BaseHook, LoggingMixin):
:param where: WHERE clause to use during import
:param direct: Use direct connector if exists for the database
:param driver: Manually specify JDBC driver class to use
+ :param extra_import_options: Extra import options to pass as dict.
+ If a key doesn't have a value, just pass an empty string to it.
+ Don't include prefix of -- for sqoop options.
"""
cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
- driver)
+ driver, extra_import_options)
cmd += ["--table", table]
@@ -210,9 +217,8 @@ class SqoopHook(BaseHook, LoggingMixin):
self.Popen(cmd)
- def import_query(self, query, target_dir,
- append=False, file_type="text",
- split_by=None, direct=None, driver=None):
+ def import_query(self, query, target_dir, append=False, file_type="text",
+ split_by=None, direct=None, driver=None, extra_import_options=None):
"""
Imports a specific query from the rdbms to hdfs
:param query: Free format query to run
@@ -223,9 +229,12 @@ class SqoopHook(BaseHook, LoggingMixin):
:param split_by: Column of the table used to split work units
:param direct: Use direct import fast path
:param driver: Manually specify JDBC driver class to use
+ :param extra_import_options: Extra import options to pass as dict.
+ If a key doesn't have a value, just pass an empty string to it.
+ Don't include prefix of -- for sqoop options.
"""
cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
- driver)
+ driver, extra_import_options)
cmd += ["--query", query]
self.Popen(cmd)
@@ -234,7 +243,7 @@ class SqoopHook(BaseHook, LoggingMixin):
input_null_non_string, staging_table, clear_staging_table,
enclosed_by, escaped_by, input_fields_terminated_by,
input_lines_terminated_by, input_optionally_enclosed_by,
- batch, relaxed_isolation):
+ batch, relaxed_isolation, extra_export_options):
cmd = self._prepare_command(export=True)
@@ -275,6 +284,11 @@ class SqoopHook(BaseHook, LoggingMixin):
if export_dir:
cmd += ["--export-dir", export_dir]
+ if extra_export_options:
+ for key, value in extra_export_options.items():
+ cmd += ['--{}'.format(key)]
+ if value: cmd += [value]
+
# The required option
cmd += ["--table", table]
@@ -286,7 +300,7 @@ class SqoopHook(BaseHook, LoggingMixin):
escaped_by, input_fields_terminated_by,
input_lines_terminated_by,
input_optionally_enclosed_by, batch,
- relaxed_isolation):
+ relaxed_isolation, extra_export_options=None):
"""
Exports Hive table to remote location. Arguments are copies of direct
sqoop command line Arguments
@@ -308,6 +322,9 @@ class SqoopHook(BaseHook, LoggingMixin):
:param batch: Use batch mode for underlying statement execution
:param relaxed_isolation: Transaction isolation to read uncommitted
for the mappers
+ :param extra_export_options: Extra export options to pass as dict.
+ If a key doesn't have a value, just pass an empty string to it.
+ Don't include prefix of -- for sqoop options.
"""
cmd = self._export_cmd(table, export_dir, input_null_string,
input_null_non_string, staging_table,
@@ -315,6 +332,6 @@ class SqoopHook(BaseHook, LoggingMixin):
input_fields_terminated_by,
input_lines_terminated_by,
input_optionally_enclosed_by, batch,
- relaxed_isolation)
+ relaxed_isolation, extra_export_options)
self.Popen(cmd)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index c3da176..cdaf336 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -25,8 +25,15 @@ from airflow.utils.decorators import apply_defaults
class SqoopOperator(BaseOperator):
"""
- execute sqoop job
+ Execute a Sqoop job.
+ Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
"""
+ template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type', 'columns', 'split_by',
+ 'where', 'export_dir', 'input_null_string', 'input_null_non_string', 'staging_table',
+ 'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by',
+ 'input_optionally_enclosed_by', 'properties', 'extra_import_options', 'driver',
+ 'extra_export_options', 'hcatalog_database', 'hcatalog_table',)
+ ui_color = '#7D8CA4'
@apply_defaults
def __init__(self,
@@ -36,7 +43,7 @@ class SqoopOperator(BaseOperator):
query=None,
target_dir=None,
append=None,
- file_type=None,
+ file_type='text',
columns=None,
num_mappers=None,
split_by=None,
@@ -59,12 +66,18 @@ class SqoopOperator(BaseOperator):
properties=None,
hcatalog_database=None,
hcatalog_table=None,
+ create_hcatalog_table=False,
+ extra_import_options=None,
+ extra_export_options=None,
*args,
**kwargs):
"""
:param conn_id: str
:param cmd_type: str specify command to execute "export" or "import"
:param table: Table to read
+ :param query: Import result of arbitrary SQL query. Instead of using the table,
+ columns and where arguments, you can specify a SQL statement with the query
+ argument. Must also specify a destination directory with target_dir.
:param target_dir: HDFS destination directory where the data
from the rdbms will be written
:param append: Append data to an existing dataset in HDFS
@@ -95,7 +108,14 @@ class SqoopOperator(BaseOperator):
:param relaxed_isolation: use read uncommitted isolation level
:param hcatalog_database: Specifies the database name for the HCatalog table
:param hcatalog_table: The argument value for this option is the HCatalog table
+ :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not
:param properties: additional JVM properties passed to sqoop
+ :param extra_import_options: Extra import options to pass as dict.
+ If a key doesn't have a value, just pass an empty string to it.
+ Don't include prefix of -- for sqoop options.
+ :param extra_export_options: Extra export options to pass as dict.
+ If a key doesn't have a value, just pass an empty string to it.
+ Don't include prefix of -- for sqoop options.
"""
super(SqoopOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
@@ -126,10 +146,10 @@ class SqoopOperator(BaseOperator):
self.relaxed_isolation = relaxed_isolation
self.hcatalog_database = hcatalog_database
self.hcatalog_table = hcatalog_table
- # No mutable types in the default parameters
- if properties is None:
- properties = {}
+ self.create_hcatalog_table = create_hcatalog_table
self.properties = properties
+ self.extra_import_options = extra_import_options
+ self.extra_export_options = extra_export_options
def execute(self, context):
"""
@@ -156,9 +176,18 @@ class SqoopOperator(BaseOperator):
input_lines_terminated_by=self.input_lines_terminated_by,
input_optionally_enclosed_by=self.input_optionally_enclosed_by,
batch=self.batch,
- relaxed_isolation=self.relaxed_isolation)
+ relaxed_isolation=self.relaxed_isolation,
+ extra_export_options=self.extra_export_options)
elif self.cmd_type == 'import':
- if not self.table:
+ # add create hcatalog table to extra import options if option passed
+ # if new params are added to constructor can pass them in here so don't modify sqoop_hook for each param
+ if self.create_hcatalog_table:
+ self.extra_import_options['create-hcatalog-table'] = ''
+
+ if self.table and self.query:
+ raise AirflowException('Cannot specify query and table together. Need to specify either or.')
+
+ if self.table:
hook.import_table(
table=self.table,
target_dir=self.target_dir,
@@ -168,16 +197,18 @@ class SqoopOperator(BaseOperator):
split_by=self.split_by,
where=self.where,
direct=self.direct,
- driver=self.driver)
- elif not self.query:
+ driver=self.driver,
+ extra_import_options=self.extra_import_options)
+ elif self.query:
hook.import_query(
- query=self.table,
+ query=self.query,
target_dir=self.target_dir,
append=self.append,
file_type=self.file_type,
split_by=self.split_by,
direct=self.direct,
- driver=self.driver)
+ driver=self.driver,
+ extra_import_options=self.extra_import_options)
else:
raise AirflowException(
"Provide query or table parameter to import using Sqoop"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/hooks/test_sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py
index 7c934b9..3eba9ec 100644
--- a/tests/contrib/hooks/test_sqoop_hook.py
+++ b/tests/contrib/hooks/test_sqoop_hook.py
@@ -13,6 +13,7 @@
# limitations under the License.
#
+import collections
import json
import unittest
@@ -40,8 +41,8 @@ class TestSqoopHook(unittest.TestCase):
_config_export = {
'table': 'domino.export_data_to',
'export_dir': '/hdfs/data/to/be/exported',
- 'input_null_string': '\n',
- 'input_null_non_string': '\t',
+ 'input_null_string': '\\n',
+ 'input_null_non_string': '\\t',
'staging_table': 'database.staging',
'clear_staging_table': True,
'enclosed_by': '"',
@@ -50,7 +51,11 @@ class TestSqoopHook(unittest.TestCase):
'input_lines_terminated_by': '\n',
'input_optionally_enclosed_by': '"',
'batch': True,
- 'relaxed_isolation': True
+ 'relaxed_isolation': True,
+ 'extra_export_options': collections.OrderedDict([
+ ('update-key', 'id'),
+ ('update-mode', 'allowinsert')
+ ])
}
_config_import = {
'target_dir': '/hdfs/data/target/location',
@@ -58,7 +63,11 @@ class TestSqoopHook(unittest.TestCase):
'file_type': 'parquet',
'split_by': '\n',
'direct': True,
- 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver'
+ 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+ 'extra_import_options': {
+ 'hcatalog-storage-stanza': "\"stored as orcfile\"",
+ 'show': ''
+ }
}
_config_json = {
@@ -94,10 +103,10 @@ class TestSqoopHook(unittest.TestCase):
self.assertEqual(mock_popen.mock_calls[0], call(
['sqoop',
'export',
+ '-fs', self._config_json['namenode'],
'-jt', self._config_json['job_tracker'],
'-libjars', self._config_json['libjars'],
'-files', self._config_json['files'],
- '-fs', self._config_json['namenode'],
'-archives', self._config_json['archives'],
'--connect', 'rmdbs:5050/schema',
'--input-null-string', self._config_export['input_null_string'],
@@ -112,9 +121,25 @@ class TestSqoopHook(unittest.TestCase):
'--batch',
'--relaxed-isolation',
'--export-dir', self._config_export['export_dir'],
+ '--update-key', 'id',
+ '--update-mode', 'allowinsert',
'--table', self._config_export['table']], stderr=-2, stdout=-1))
+ def test_submit_none_mappers(self):
+ """
+ Test to check that if value of num_mappers is None, then it shouldn't be in the cmd built.
+ """
+ _config_without_mappers = self._config.copy()
+ _config_without_mappers['num_mappers'] = None
+
+ hook = SqoopHook(**_config_without_mappers)
+ cmd = ' '.join(hook._prepare_command())
+ self.assertNotIn('--num-mappers', cmd)
+
def test_submit(self):
+ """
+ Tests to verify that from connection extra option the options are added to the Sqoop command.
+ """
hook = SqoopHook(**self._config)
cmd = ' '.join(hook._prepare_command())
@@ -124,20 +149,16 @@ class TestSqoopHook(unittest.TestCase):
self.assertIn("-fs {}".format(self._config_json['namenode']), cmd)
if self._config_json['job_tracker']:
- self.assertIn("-jt {}".format(self._config_json['job_tracker']),
- cmd)
+ self.assertIn("-jt {}".format(self._config_json['job_tracker']), cmd)
if self._config_json['libjars']:
- self.assertIn("-libjars {}".format(self._config_json['libjars']),
- cmd)
+ self.assertIn("-libjars {}".format(self._config_json['libjars']), cmd)
if self._config_json['files']:
self.assertIn("-files {}".format(self._config_json['files']), cmd)
if self._config_json['archives']:
- self.assertIn(
- "-archives {}".format(self._config_json['archives']), cmd
- )
+ self.assertIn( "-archives {}".format(self._config_json['archives']), cmd)
self.assertIn("--hcatalog-database {}".format(self._config['hcatalog_database']), cmd)
self.assertIn("--hcatalog-table {}".format(self._config['hcatalog_table']), cmd)
@@ -147,11 +168,8 @@ class TestSqoopHook(unittest.TestCase):
self.assertIn("--verbose", cmd)
if self._config['num_mappers']:
- self.assertIn(
- "--num-mappers {}".format(self._config['num_mappers']), cmd
- )
+ self.assertIn( "--num-mappers {}".format(self._config['num_mappers']), cmd)
- print(self._config['properties'])
for key, value in self._config['properties'].items():
self.assertIn("-D {}={}".format(key, value), cmd)
@@ -161,14 +179,15 @@ class TestSqoopHook(unittest.TestCase):
hook.export_table(**self._config_export)
with self.assertRaises(OSError):
- hook.import_table(table='schema.table',
- target_dir='/sqoop/example/path')
+ hook.import_table(table='schema.table', target_dir='/sqoop/example/path')
with self.assertRaises(OSError):
- hook.import_query(query='SELECT * FROM sometable',
- target_dir='/sqoop/example/path')
+ hook.import_query(query='SELECT * FROM sometable', target_dir='/sqoop/example/path')
def test_export_cmd(self):
+ """
+ Tests to verify the hook export command is building correct Sqoop export command.
+ """
hook = SqoopHook()
# The subprocess requires an array but we build the cmd by joining on a space
@@ -190,7 +209,9 @@ class TestSqoopHook(unittest.TestCase):
input_optionally_enclosed_by=self._config_export[
'input_optionally_enclosed_by'],
batch=self._config_export['batch'],
- relaxed_isolation=self._config_export['relaxed_isolation'])
+ relaxed_isolation=self._config_export['relaxed_isolation'],
+ extra_export_options=self._config_export['extra_export_options']
+ )
)
self.assertIn("--input-null-string {}".format(
@@ -209,6 +230,9 @@ class TestSqoopHook(unittest.TestCase):
self._config_export['input_lines_terminated_by']), cmd)
self.assertIn("--input-optionally-enclosed-by {}".format(
self._config_export['input_optionally_enclosed_by']), cmd)
+ # these options are from the extra export options
+ self.assertIn("--update-key id", cmd)
+ self.assertIn("--update-mode allowinsert", cmd)
if self._config_export['clear_staging_table']:
self.assertIn("--clear-staging-table", cmd)
@@ -220,16 +244,22 @@ class TestSqoopHook(unittest.TestCase):
self.assertIn("--relaxed-isolation", cmd)
def test_import_cmd(self):
+ """
+ Tests to verify the hook import command is building correct Sqoop import command.
+ """
hook = SqoopHook()
# The subprocess requires an array but we build the cmd by joining on a space
cmd = ' '.join(
- hook._import_cmd(self._config_import['target_dir'],
- append=self._config_import['append'],
- file_type=self._config_import['file_type'],
- split_by=self._config_import['split_by'],
- direct=self._config_import['direct'],
- driver=self._config_import['driver'])
+ hook._import_cmd(
+ self._config_import['target_dir'],
+ append=self._config_import['append'],
+ file_type=self._config_import['file_type'],
+ split_by=self._config_import['split_by'],
+ direct=self._config_import['direct'],
+ driver=self._config_import['driver'],
+ extra_import_options=None
+ )
)
if self._config_import['append']:
@@ -242,10 +272,32 @@ class TestSqoopHook(unittest.TestCase):
self._config_import['target_dir']), cmd)
self.assertIn('--driver {}'.format(self._config_import['driver']), cmd)
- self.assertIn('--split-by {}'.format(self._config_import['split_by']),
- cmd)
+ self.assertIn('--split-by {}'.format(self._config_import['split_by']), cmd)
+ # these are from extra options, but not passed to this cmd import command
+ self.assertNotIn('--show', cmd)
+ self.assertNotIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd)
+
+ cmd = ' '.join(
+ hook._import_cmd(
+ target_dir=None,
+ append=self._config_import['append'],
+ file_type=self._config_import['file_type'],
+ split_by=self._config_import['split_by'],
+ direct=self._config_import['direct'],
+ driver=self._config_import['driver'],
+ extra_import_options=self._config_import['extra_import_options']
+ )
+ )
+
+ self.assertNotIn('--target-dir', cmd)
+ # these checks are from the extra import options
+ self.assertIn('--show', cmd)
+ self.assertIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd)
def test_get_export_format_argument(self):
+ """
+ Tests to verify the hook get format function is building correct Sqoop command with correct format type.
+ """
hook = SqoopHook()
self.assertIn("--as-avrodatafile",
hook._get_export_format_argument('avro'))
@@ -259,6 +311,9 @@ class TestSqoopHook(unittest.TestCase):
hook._get_export_format_argument('unknown')
def test_cmd_mask_password(self):
+ """
+ Tests to verify the hook masking function will correctly mask a user password in Sqoop command.
+ """
hook = SqoopHook()
self.assertEqual(
hook.cmd_mask_password(['--password', 'supersecret']),
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/operators/test_sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py
index f632805..d9e39b5 100644
--- a/tests/contrib/operators/test_sqoop_operator.py
+++ b/tests/contrib/operators/test_sqoop_operator.py
@@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException
class TestSqoopOperator(unittest.TestCase):
_config = {
+ 'conn_id': 'sqoop_default',
'cmd_type': 'export',
'table': 'target_table',
'query': 'SELECT * FROM schema.table',
@@ -46,10 +47,19 @@ class TestSqoopOperator(unittest.TestCase):
'relaxed_isolation': True,
'direct': True,
'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+ 'create_hcatalog_table': True,
'hcatalog_database': 'hive_database',
'hcatalog_table': 'hive_table',
'properties': {
'mapred.map.max.attempts': '1'
+ },
+ 'extra_import_options': {
+ 'hcatalog-storage-stanza': "\"stored as orcfile\"",
+ 'show': ''
+ },
+ 'extra_export_options': {
+ 'update-key': 'id',
+ 'update-mode': 'allowinsert'
}
}
@@ -61,15 +71,18 @@ class TestSqoopOperator(unittest.TestCase):
}
self.dag = DAG('test_dag_id', default_args=args)
- def test_execute(self, conn_id='sqoop_default'):
+ def test_execute(self):
+ """
+ Tests to verify values of the SqoopOperator match that passed in from the config.
+ """
operator = SqoopOperator(
task_id='sqoop_job',
dag=self.dag,
**self._config
)
- self.assertEqual(conn_id, operator.conn_id)
-
+ self.assertEqual(self._config['conn_id'], operator.conn_id)
+ self.assertEqual(self._config['query'], operator.query)
self.assertEqual(self._config['cmd_type'], operator.cmd_type)
self.assertEqual(self._config['table'], operator.table)
self.assertEqual(self._config['target_dir'], operator.target_dir)
@@ -77,28 +90,117 @@ class TestSqoopOperator(unittest.TestCase):
self.assertEqual(self._config['file_type'], operator.file_type)
self.assertEqual(self._config['num_mappers'], operator.num_mappers)
self.assertEqual(self._config['split_by'], operator.split_by)
- self.assertEqual(self._config['input_null_string'],
- operator.input_null_string)
- self.assertEqual(self._config['input_null_non_string'],
- operator.input_null_non_string)
+ self.assertEqual(self._config['input_null_string'], operator.input_null_string)
+ self.assertEqual(self._config['input_null_non_string'], operator.input_null_non_string)
self.assertEqual(self._config['staging_table'], operator.staging_table)
- self.assertEqual(self._config['clear_staging_table'],
- operator.clear_staging_table)
+ self.assertEqual(self._config['clear_staging_table'], operator.clear_staging_table)
self.assertEqual(self._config['batch'], operator.batch)
- self.assertEqual(self._config['relaxed_isolation'],
- operator.relaxed_isolation)
+ self.assertEqual(self._config['relaxed_isolation'], operator.relaxed_isolation)
self.assertEqual(self._config['direct'], operator.direct)
self.assertEqual(self._config['driver'], operator.driver)
self.assertEqual(self._config['properties'], operator.properties)
self.assertEqual(self._config['hcatalog_database'], operator.hcatalog_database)
self.assertEqual(self._config['hcatalog_table'], operator.hcatalog_table)
+ self.assertEqual(self._config['create_hcatalog_table'], operator.create_hcatalog_table)
+ self.assertEqual(self._config['extra_import_options'], operator.extra_import_options)
+ self.assertEqual(self._config['extra_export_options'], operator.extra_export_options)
+
+ # the following are meant to be more of examples
+ sqoop_import_op = SqoopOperator(
+ task_id='sqoop_import_using_table',
+ cmd_type='import',
+ conn_id='sqoop_default',
+ table='company',
+ verbose=True,
+ num_mappers=8,
+ hcatalog_database='default',
+ hcatalog_table='import_table_1',
+ create_hcatalog_table=True,
+ extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""},
+ dag=self.dag
+ )
+
+ sqoop_import_op_qry = SqoopOperator(
+ task_id='sqoop_import_using_query',
+ cmd_type='import',
+ conn_id='sqoop_default',
+ query='select name, age from company where $CONDITIONS',
+ split_by='age', # the mappers will pass in values to the $CONDITIONS based on the field you select to split by
+ verbose=True,
+ num_mappers=None,
+ hcatalog_database='default',
+ hcatalog_table='import_table_2',
+ create_hcatalog_table=True,
+ extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""},
+ dag=self.dag
+ )
+
+ sqoop_import_op_with_partition = SqoopOperator(
+ task_id='sqoop_import_with_partition',
+ cmd_type='import',
+ conn_id='sqoop_default',
+ table='company',
+ verbose=True,
+ num_mappers=None,
+ hcatalog_database='default',
+ hcatalog_table='import_table_3',
+ create_hcatalog_table=True,
+ extra_import_options={
+ 'hcatalog-storage-stanza': "\"stored as orcfile\"",
+ 'hive-partition-key': 'day',
+ 'hive-partition-value': '2017-10-18'},
+ dag=self.dag
+ )
+
+ sqoop_export_op_name = SqoopOperator(
+ task_id='sqoop_export_tablename',
+ cmd_type='export',
+ conn_id='sqoop_default',
+ table='rbdms_export_table_1',
+ verbose=True,
+ num_mappers=None,
+ hcatalog_database='default',
+ hcatalog_table='hive_export_table_1',
+ extra_export_options=None,
+ dag=self.dag
+ )
+
+ sqoop_export_op_path = SqoopOperator(
+ task_id='sqoop_export_tablepath',
+ cmd_type='export',
+ conn_id='sqoop_default',
+ table='rbdms_export_table_2',
+ export_dir='/user/hive/warehouse/export_table_2',
+ direct=True, # speeds up for data transfer
+ verbose=True,
+ num_mappers=None,
+ extra_export_options=None,
+ dag=self.dag
+ )
def test_invalid_cmd_type(self):
+ """
+ Tests to verify if the cmd_type is not import or export, an exception is raised.
+ """
operator = SqoopOperator(task_id='sqoop_job', dag=self.dag,
cmd_type='invalid')
with self.assertRaises(AirflowException):
operator.execute({})
+ def test_invalid_import_options(self):
+ """
+ Tests to verify if a user passes both a query and a table then an exception is raised.
+ """
+ import_query_and_table_configs = self._config.copy()
+ import_query_and_table_configs['cmd_type'] = 'import'
+ operator = SqoopOperator(
+ task_id='sqoop_job',
+ dag=self.dag,
+ **import_query_and_table_configs
+ )
+ with self.assertRaises(AirflowException):
+ operator.execute({})
+
if __name__ == '__main__':
unittest.main()