You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/21 10:08:55 UTC
incubator-airflow git commit: [AIRFLOW-1136] Capture invalid
arguments for Sqoop
Repository: incubator-airflow
Updated Branches:
refs/heads/master 659827639 -> 2ef4dbbe0
[AIRFLOW-1136] Capture invalid arguments for Sqoop
Invalid arguments are not captured for the
SqoopHook and SqoopOperator:
- SqoopHook should raise an exception if the
file_type is invalid
- SqoopOperator should raise an exception if the
cmd_type is invalid
Closes #2252 from hgrif/AIRFLOW-1136
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2ef4dbbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2ef4dbbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2ef4dbbe
Branch: refs/heads/master
Commit: 2ef4dbbe0bf6e8ca116ad01bf209e7155d311d43
Parents: 6598276
Author: Henk Griffioen <hg...@users.noreply.github.com>
Authored: Fri Apr 21 12:08:48 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 21 12:08:48 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/sqoop_hook.py | 50 ++++++-----
airflow/contrib/operators/sqoop_operator.py | 6 +-
tests/contrib/hooks/test_sqoop_hook.py | 92 ++++++++++++---------
tests/contrib/operators/test_sqoop_operator.py | 7 ++
4 files changed, 91 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index e1f4779..6c5ee58 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -28,21 +28,27 @@ log = logging.getLogger(__name__)
class SqoopHook(BaseHook):
"""
- This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook
+ This hook is a wrapper around the sqoop 1 binary. To be able to use the hook
it is required that "sqoop" is in the PATH.
- :param job_tracker: (from json) specify a job tracker local|jobtracker:port
- :type job_tracker: str
- :param namenode: (from json) specify a namenode
- :type namenode: str
- :param lib_jars: (from json) specify comma separated jar
- files to include in the classpath.
- :type lib_jars: str
- :param files: (from json) specify comma separated files to be copied to
- the map reduce cluster
- :type files: (from json) str
- :param archives: (from json) specify comma separated archives to be
- unarchived on the compute machines.
- :type archives: str
+
+ Additional arguments that can be passed via the 'extra' JSON field of the
+ sqoop connection:
+ * job_tracker: Job tracker local|jobtracker:port.
+ * namenode: Namenode.
+ * lib_jars: Comma separated jar files to include in the classpath.
+ * files: Comma separated files to be copied to the map reduce cluster.
+ * archives: Comma separated archives to be unarchived on the compute
+ machines.
+ * password_file: Path to file containing the password.
+
+ :param conn_id: Reference to the sqoop connection.
+ :type conn_id: str
+ :param verbose: Set sqoop to verbose.
+ :type verbose: bool
+ :param num_mappers: Number of map tasks to import in parallel.
+ :type num_mappers: str
+ :param properties: Properties to set via the -D argument
+ :type properties: dict
"""
def __init__(self, conn_id='sqoop_default', verbose=False,
@@ -80,12 +86,11 @@ class SqoopHook(BaseHook):
output, stderr = process.communicate()
if process.returncode != 0:
- raise AirflowException((
- "Cannot execute {} on {}. Error code is: {}"
- "Output: {}, Stderr: {}"
- ).format(cmd, self.conn.host,
- process.returncode, output,
- stderr))
+ raise AirflowException(
+ "Cannot execute {} on {}. Error code is: {} Output: {}, "
+ "Stderr: {}".format(cmd, self.conn.host, process.returncode,
+ output, stderr)
+ )
def _prepare_command(self, export=False):
if export:
@@ -132,8 +137,11 @@ class SqoopHook(BaseHook):
return ["--as-sequencefile"]
elif file_type == "parquet":
return ["--as-parquetfile"]
- else:
+ elif file_type == "text":
return ["--as-textfile"]
+ else:
+ raise AirflowException("Argument file_type should be 'avro', "
+ "'sequence', 'parquet' or 'text'.")
def _import_cmd(self, target_dir, append, file_type, split_by, direct,
driver):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index 3dd9403..b0dc88f 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -134,7 +134,7 @@ class SqoopOperator(BaseOperator):
num_mappers=self.num_mappers,
properties=self.properties)
- if self.cmd_type is 'export':
+ if self.cmd_type == 'export':
hook.export_table(
table=self.table,
export_dir=self.export_dir,
@@ -149,7 +149,7 @@ class SqoopOperator(BaseOperator):
input_optionally_enclosed_by=self.input_optionally_enclosed_by,
batch=self.batch,
relaxed_isolation=self.relaxed_isolation)
- else:
+ elif self.cmd_type == 'import':
if not self.table:
hook.import_table(
table=self.table,
@@ -174,3 +174,5 @@ class SqoopOperator(BaseOperator):
raise AirflowException(
"Provide query or table parameter to import using Sqoop"
)
+ else:
+ raise AirflowException("cmd_type should be 'import' or 'export'")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/tests/contrib/hooks/test_sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py
index ca8033b..5abe1a0 100644
--- a/tests/contrib/hooks/test_sqoop_hook.py
+++ b/tests/contrib/hooks/test_sqoop_hook.py
@@ -18,6 +18,7 @@ import unittest
from airflow import configuration, models
from airflow.contrib.hooks.sqoop_hook import SqoopHook
+from airflow.exceptions import AirflowException
from airflow.utils import db
@@ -88,31 +89,36 @@ class TestSqoopHook(unittest.TestCase):
# Check if the config has been extracted from the json
if self._config_json['namenode']:
- assert "-fs {}".format(self._config_json['namenode']) in cmd
+ self.assertIn("-fs {}".format(self._config_json['namenode']), cmd)
if self._config_json['job_tracker']:
- assert "-jt {}".format(self._config_json['job_tracker']) in cmd
+ self.assertIn("-jt {}".format(self._config_json['job_tracker']),
+ cmd)
if self._config_json['libjars']:
- assert "-libjars {}".format(self._config_json['libjars']) in cmd
+ self.assertIn("-libjars {}".format(self._config_json['libjars']),
+ cmd)
if self._config_json['files']:
- assert "-files {}".format(self._config_json['files']) in cmd
+ self.assertIn("-files {}".format(self._config_json['files']), cmd)
if self._config_json['archives']:
- assert "-archives {}".format(self._config_json['archives']) in cmd
+ self.assertIn(
+ "-archives {}".format(self._config_json['archives']), cmd
+ )
# Check the regulator stuff passed by the default constructor
if self._config['verbose']:
- assert "--verbose" in cmd
+ self.assertIn("--verbose", cmd)
if self._config['num_mappers']:
- assert "--num-mappers {}".format(
- self._config['num_mappers']) in cmd
+ self.assertIn(
+ "--num-mappers {}".format(self._config['num_mappers']), cmd
+ )
print(self._config['properties'])
for key, value in self._config['properties'].items():
- assert "-D {}={}".format(key, value) in cmd
+ self.assertIn("-D {}={}".format(key, value), cmd)
# We don't have the sqoop binary available, and this is hard to mock,
# so just accept an exception for now.
@@ -152,31 +158,31 @@ class TestSqoopHook(unittest.TestCase):
relaxed_isolation=self._config_export['relaxed_isolation'])
)
- assert "--input-null-string {}".format(
- self._config_export['input_null_string']) in cmd
- assert "--input-null-non-string {}".format(
- self._config_export['input_null_non_string']) in cmd
- assert "--staging-table {}".format(
- self._config_export['staging_table']) in cmd
- assert "--enclosed-by {}".format(
- self._config_export['enclosed_by']) in cmd
- assert "--escaped-by {}".format(
- self._config_export['escaped_by']) in cmd
- assert "--input-fields-terminated-by {}".format(
- self._config_export['input_fields_terminated_by']) in cmd
- assert "--input-lines-terminated-by {}".format(
- self._config_export['input_lines_terminated_by']) in cmd
- assert "--input-optionally-enclosed-by {}".format(
- self._config_export['input_optionally_enclosed_by']) in cmd
+ self.assertIn("--input-null-string {}".format(
+ self._config_export['input_null_string']), cmd)
+ self.assertIn("--input-null-non-string {}".format(
+ self._config_export['input_null_non_string']), cmd)
+ self.assertIn("--staging-table {}".format(
+ self._config_export['staging_table']), cmd)
+ self.assertIn("--enclosed-by {}".format(
+ self._config_export['enclosed_by']), cmd)
+ self.assertIn("--escaped-by {}".format(
+ self._config_export['escaped_by']), cmd)
+ self.assertIn("--input-fields-terminated-by {}".format(
+ self._config_export['input_fields_terminated_by']), cmd)
+ self.assertIn("--input-lines-terminated-by {}".format(
+ self._config_export['input_lines_terminated_by']), cmd)
+ self.assertIn("--input-optionally-enclosed-by {}".format(
+ self._config_export['input_optionally_enclosed_by']), cmd)
if self._config_export['clear_staging_table']:
- assert "--clear-staging-table" in cmd
+ self.assertIn("--clear-staging-table", cmd)
if self._config_export['batch']:
- assert "--batch" in cmd
+ self.assertIn("--batch", cmd)
if self._config_export['relaxed_isolation']:
- assert "--relaxed-isolation" in cmd
+ self.assertIn("--relaxed-isolation", cmd)
def test_import_cmd(self):
hook = SqoopHook()
@@ -192,26 +198,30 @@ class TestSqoopHook(unittest.TestCase):
)
if self._config_import['append']:
- assert '--append' in cmd
+ self.assertIn('--append', cmd)
if self._config_import['direct']:
- assert '--direct' in cmd
+ self.assertIn('--direct', cmd)
- assert '--target-dir {}'.format(
- self._config_import['target_dir']) in cmd
+ self.assertIn('--target-dir {}'.format(
+ self._config_import['target_dir']), cmd)
- assert '--driver {}'.format(self._config_import['driver']) in cmd
- assert '--split-by {}'.format(self._config_import['split_by']) in cmd
+ self.assertIn('--driver {}'.format(self._config_import['driver']), cmd)
+ self.assertIn('--split-by {}'.format(self._config_import['split_by']),
+ cmd)
def test_get_export_format_argument(self):
hook = SqoopHook()
- assert "--as-avrodatafile" in hook._get_export_format_argument('avro')
- assert "--as-parquetfile" in hook._get_export_format_argument(
- 'parquet')
- assert "--as-sequencefile" in hook._get_export_format_argument(
- 'sequence')
- assert "--as-textfile" in hook._get_export_format_argument('text')
- assert "--as-textfile" in hook._get_export_format_argument('unknown')
+ self.assertIn("--as-avrodatafile",
+ hook._get_export_format_argument('avro'))
+ self.assertIn("--as-parquetfile",
+ hook._get_export_format_argument('parquet'))
+ self.assertIn("--as-sequencefile",
+ hook._get_export_format_argument('sequence'))
+ self.assertIn("--as-textfile",
+ hook._get_export_format_argument('text'))
+ with self.assertRaises(AirflowException):
+ hook._get_export_format_argument('unknown')
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/tests/contrib/operators/test_sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py
index a46dc93..679b22e 100644
--- a/tests/contrib/operators/test_sqoop_operator.py
+++ b/tests/contrib/operators/test_sqoop_operator.py
@@ -18,6 +18,7 @@ import unittest
from airflow import DAG, configuration
from airflow.contrib.operators.sqoop_operator import SqoopOperator
+from airflow.exceptions import AirflowException
class TestSqoopOperator(unittest.TestCase):
@@ -88,6 +89,12 @@ class TestSqoopOperator(unittest.TestCase):
self.assertEqual(self._config['driver'], operator.driver)
self.assertEqual(self._config['properties'], operator.properties)
+ def test_invalid_cmd_type(self):
+ operator = SqoopOperator(task_id='sqoop_job', dag=self.dag,
+ cmd_type='invalid')
+ with self.assertRaises(AirflowException):
+ operator.execute({})
+
if __name__ == '__main__':
unittest.main()