You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/21 10:08:55 UTC

incubator-airflow git commit: [AIRFLOW-1136] Capture invalid arguments for Sqoop

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 659827639 -> 2ef4dbbe0


[AIRFLOW-1136] Capture invalid arguments for Sqoop

Invalid arguments are not captured for the
SqoopHook and SqoopOperator:
- SqoopHook should raise an exception if the
file_type is invalid
- SqoopOperator should raise an exception if the
cmd_type is invalid

Closes #2252 from hgrif/AIRFLOW-1136


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2ef4dbbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2ef4dbbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2ef4dbbe

Branch: refs/heads/master
Commit: 2ef4dbbe0bf6e8ca116ad01bf209e7155d311d43
Parents: 6598276
Author: Henk Griffioen <hg...@users.noreply.github.com>
Authored: Fri Apr 21 12:08:48 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 21 12:08:48 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py            | 50 ++++++-----
 airflow/contrib/operators/sqoop_operator.py    |  6 +-
 tests/contrib/hooks/test_sqoop_hook.py         | 92 ++++++++++++---------
 tests/contrib/operators/test_sqoop_operator.py |  7 ++
 4 files changed, 91 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index e1f4779..6c5ee58 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -28,21 +28,27 @@ log = logging.getLogger(__name__)
 
 class SqoopHook(BaseHook):
     """
-    This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook
+    This hook is a wrapper around the sqoop 1 binary. To be able to use the hook
     it is required that "sqoop" is in the PATH.
-    :param job_tracker: (from json) specify a job tracker local|jobtracker:port
-    :type job_tracker: str
-    :param namenode: (from json) specify a namenode
-    :type namenode: str
-    :param lib_jars: (from json) specify comma separated jar
-        files to include in the classpath.
-    :type lib_jars: str
-    :param files: (from json) specify comma separated files to be copied to
-        the map reduce cluster
-    :type files: (from json) str
-    :param archives: (from json)  specify comma separated archives to be
-        unarchived on the compute machines.
-    :type archives: str
+
+    Additional arguments that can be passed via the 'extra' JSON field of the
+    sqoop connection:
+    * job_tracker: Job tracker local|jobtracker:port.
+    * namenode: Namenode.
+    * lib_jars: Comma separated jar files to include in the classpath.
+    * files: Comma separated files to be copied to the map reduce cluster.
+    * archives: Comma separated archives to be unarchived on the compute
+        machines.
+    * password_file: Path to file containing the password.
+
+    :param conn_id: Reference to the sqoop connection.
+    :type conn_id: str
+    :param verbose: Set sqoop to verbose.
+    :type verbose: bool
+    :param num_mappers: Number of map tasks to import in parallel.
+    :type num_mappers: str
+    :param properties: Properties to set via the -D argument
+    :type properties: dict
     """
 
     def __init__(self, conn_id='sqoop_default', verbose=False,
@@ -80,12 +86,11 @@ class SqoopHook(BaseHook):
         output, stderr = process.communicate()
 
         if process.returncode != 0:
-            raise AirflowException((
-                                       "Cannot execute {} on {}. Error code is: {}"
-                                       "Output: {}, Stderr: {}"
-                                   ).format(cmd, self.conn.host,
-                                            process.returncode, output,
-                                            stderr))
+            raise AirflowException(
+                "Cannot execute {} on {}. Error code is: {} Output: {}, "
+                "Stderr: {}".format(cmd, self.conn.host, process.returncode,
+                                    output, stderr)
+            )
 
     def _prepare_command(self, export=False):
         if export:
@@ -132,8 +137,11 @@ class SqoopHook(BaseHook):
             return ["--as-sequencefile"]
         elif file_type == "parquet":
             return ["--as-parquetfile"]
-        else:
+        elif file_type == "text":
             return ["--as-textfile"]
+        else:
+            raise AirflowException("Argument file_type should be 'avro', "
+                                   "'sequence', 'parquet' or 'text'.")
 
     def _import_cmd(self, target_dir, append, file_type, split_by, direct,
                     driver):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index 3dd9403..b0dc88f 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -134,7 +134,7 @@ class SqoopOperator(BaseOperator):
                          num_mappers=self.num_mappers,
                          properties=self.properties)
 
-        if self.cmd_type is 'export':
+        if self.cmd_type == 'export':
             hook.export_table(
                 table=self.table,
                 export_dir=self.export_dir,
@@ -149,7 +149,7 @@ class SqoopOperator(BaseOperator):
                 input_optionally_enclosed_by=self.input_optionally_enclosed_by,
                 batch=self.batch,
                 relaxed_isolation=self.relaxed_isolation)
-        else:
+        elif self.cmd_type == 'import':
             if not self.table:
                 hook.import_table(
                     table=self.table,
@@ -174,3 +174,5 @@ class SqoopOperator(BaseOperator):
                 raise AirflowException(
                     "Provide query or table parameter to import using Sqoop"
                 )
+        else:
+            raise AirflowException("cmd_type should be 'import' or 'export'")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/tests/contrib/hooks/test_sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py
index ca8033b..5abe1a0 100644
--- a/tests/contrib/hooks/test_sqoop_hook.py
+++ b/tests/contrib/hooks/test_sqoop_hook.py
@@ -18,6 +18,7 @@ import unittest
 
 from airflow import configuration, models
 from airflow.contrib.hooks.sqoop_hook import SqoopHook
+from airflow.exceptions import AirflowException
 from airflow.utils import db
 
 
@@ -88,31 +89,36 @@ class TestSqoopHook(unittest.TestCase):
 
         # Check if the config has been extracted from the json
         if self._config_json['namenode']:
-            assert "-fs {}".format(self._config_json['namenode']) in cmd
+            self.assertIn("-fs {}".format(self._config_json['namenode']), cmd)
 
         if self._config_json['job_tracker']:
-            assert "-jt {}".format(self._config_json['job_tracker']) in cmd
+            self.assertIn("-jt {}".format(self._config_json['job_tracker']),
+                          cmd)
 
         if self._config_json['libjars']:
-            assert "-libjars {}".format(self._config_json['libjars']) in cmd
+            self.assertIn("-libjars {}".format(self._config_json['libjars']),
+                          cmd)
 
         if self._config_json['files']:
-            assert "-files {}".format(self._config_json['files']) in cmd
+            self.assertIn("-files {}".format(self._config_json['files']), cmd)
 
         if self._config_json['archives']:
-            assert "-archives {}".format(self._config_json['archives']) in cmd
+            self.assertIn(
+                "-archives {}".format(self._config_json['archives']), cmd
+            )
 
         # Check the regulator stuff passed by the default constructor
         if self._config['verbose']:
-            assert "--verbose" in cmd
+            self.assertIn("--verbose", cmd)
 
         if self._config['num_mappers']:
-            assert "--num-mappers {}".format(
-                self._config['num_mappers']) in cmd
+            self.assertIn(
+                "--num-mappers {}".format(self._config['num_mappers']), cmd
+            )
 
         print(self._config['properties'])
         for key, value in self._config['properties'].items():
-            assert "-D {}={}".format(key, value) in cmd
+            self.assertIn("-D {}={}".format(key, value), cmd)
 
         # We don't have the sqoop binary available, and this is hard to mock,
         # so just accept an exception for now.
@@ -152,31 +158,31 @@ class TestSqoopHook(unittest.TestCase):
                 relaxed_isolation=self._config_export['relaxed_isolation'])
         )
 
-        assert "--input-null-string {}".format(
-            self._config_export['input_null_string']) in cmd
-        assert "--input-null-non-string {}".format(
-            self._config_export['input_null_non_string']) in cmd
-        assert "--staging-table {}".format(
-            self._config_export['staging_table']) in cmd
-        assert "--enclosed-by {}".format(
-            self._config_export['enclosed_by']) in cmd
-        assert "--escaped-by {}".format(
-            self._config_export['escaped_by']) in cmd
-        assert "--input-fields-terminated-by {}".format(
-            self._config_export['input_fields_terminated_by']) in cmd
-        assert "--input-lines-terminated-by {}".format(
-            self._config_export['input_lines_terminated_by']) in cmd
-        assert "--input-optionally-enclosed-by {}".format(
-            self._config_export['input_optionally_enclosed_by']) in cmd
+        self.assertIn("--input-null-string {}".format(
+            self._config_export['input_null_string']), cmd)
+        self.assertIn("--input-null-non-string {}".format(
+            self._config_export['input_null_non_string']), cmd)
+        self.assertIn("--staging-table {}".format(
+            self._config_export['staging_table']), cmd)
+        self.assertIn("--enclosed-by {}".format(
+            self._config_export['enclosed_by']), cmd)
+        self.assertIn("--escaped-by {}".format(
+            self._config_export['escaped_by']), cmd)
+        self.assertIn("--input-fields-terminated-by {}".format(
+            self._config_export['input_fields_terminated_by']), cmd)
+        self.assertIn("--input-lines-terminated-by {}".format(
+            self._config_export['input_lines_terminated_by']), cmd)
+        self.assertIn("--input-optionally-enclosed-by {}".format(
+            self._config_export['input_optionally_enclosed_by']), cmd)
 
         if self._config_export['clear_staging_table']:
-            assert "--clear-staging-table" in cmd
+            self.assertIn("--clear-staging-table", cmd)
 
         if self._config_export['batch']:
-            assert "--batch" in cmd
+            self.assertIn("--batch", cmd)
 
         if self._config_export['relaxed_isolation']:
-            assert "--relaxed-isolation" in cmd
+            self.assertIn("--relaxed-isolation", cmd)
 
     def test_import_cmd(self):
         hook = SqoopHook()
@@ -192,26 +198,30 @@ class TestSqoopHook(unittest.TestCase):
         )
 
         if self._config_import['append']:
-            assert '--append' in cmd
+            self.assertIn('--append', cmd)
 
         if self._config_import['direct']:
-            assert '--direct' in cmd
+            self.assertIn('--direct', cmd)
 
-        assert '--target-dir {}'.format(
-            self._config_import['target_dir']) in cmd
+        self.assertIn('--target-dir {}'.format(
+            self._config_import['target_dir']), cmd)
 
-        assert '--driver {}'.format(self._config_import['driver']) in cmd
-        assert '--split-by {}'.format(self._config_import['split_by']) in cmd
+        self.assertIn('--driver {}'.format(self._config_import['driver']), cmd)
+        self.assertIn('--split-by {}'.format(self._config_import['split_by']),
+                      cmd)
 
     def test_get_export_format_argument(self):
         hook = SqoopHook()
-        assert "--as-avrodatafile" in hook._get_export_format_argument('avro')
-        assert "--as-parquetfile" in hook._get_export_format_argument(
-            'parquet')
-        assert "--as-sequencefile" in hook._get_export_format_argument(
-            'sequence')
-        assert "--as-textfile" in hook._get_export_format_argument('text')
-        assert "--as-textfile" in hook._get_export_format_argument('unknown')
+        self.assertIn("--as-avrodatafile",
+                      hook._get_export_format_argument('avro'))
+        self.assertIn("--as-parquetfile",
+                      hook._get_export_format_argument('parquet'))
+        self.assertIn("--as-sequencefile",
+                      hook._get_export_format_argument('sequence'))
+        self.assertIn("--as-textfile",
+                      hook._get_export_format_argument('text'))
+        with self.assertRaises(AirflowException):
+            hook._get_export_format_argument('unknown')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2ef4dbbe/tests/contrib/operators/test_sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py
index a46dc93..679b22e 100644
--- a/tests/contrib/operators/test_sqoop_operator.py
+++ b/tests/contrib/operators/test_sqoop_operator.py
@@ -18,6 +18,7 @@ import unittest
 
 from airflow import DAG, configuration
 from airflow.contrib.operators.sqoop_operator import SqoopOperator
+from airflow.exceptions import AirflowException
 
 
 class TestSqoopOperator(unittest.TestCase):
@@ -88,6 +89,12 @@ class TestSqoopOperator(unittest.TestCase):
         self.assertEqual(self._config['driver'], operator.driver)
         self.assertEqual(self._config['properties'], operator.properties)
 
+    def test_invalid_cmd_type(self):
+        operator = SqoopOperator(task_id='sqoop_job', dag=self.dag,
+                                 cmd_type='invalid')
+        with self.assertRaises(AirflowException):
+            operator.execute({})
+
 
 if __name__ == '__main__':
     unittest.main()