You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 14:08:36 UTC

incubator-airflow git commit: [AIRFLOW-1995][Airflow 1995] add on_kill method to SqoopOperator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master eb994d683 -> 147472b99


[AIRFLOW-1995][Airflow 1995] add on_kill method to SqoopOperator

Closes #2936 from Acehaidrey/AIRFLOW-1995


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/147472b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/147472b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/147472b9

Branch: refs/heads/master
Commit: 147472b99babdbc67e63f784968864703e168562
Parents: eb994d6
Author: Ace Haidrey <ah...@pandora.com>
Authored: Fri Jan 12 15:08:29 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 15:08:29 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py         | 15 ++++++-----
 airflow/contrib/operators/sqoop_operator.py | 34 +++++++++++++++---------
 2 files changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 578b527..102b599 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -88,21 +88,22 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd))))
-        sp = subprocess.Popen(cmd,
+        masked_cmd = ' '.join(self.cmd_mask_password(cmd))
+        self.log.info("Executing command: {}".format(masked_cmd))
+        self.sp = subprocess.Popen(cmd,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               **kwargs)
 
-        for line in iter(sp.stdout):
+        for line in iter(self.sp.stdout):
             self.log.info(line.strip())
 
-        sp.wait()
+        self.sp.wait()
 
-        self.log.info("Command exited with return code %s", sp.returncode)
+        self.log.info("Command exited with return code %s", self.sp.returncode)
 
-        if sp.returncode:
-            raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd))))
+        if self.sp.returncode:
+            raise AirflowException("Sqoop command failed: {}".format(masked_cmd))
 
     def _prepare_command(self, export=False):
         sqoop_cmd_type = "export" if export else "import"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index cdaf336..cc0d646 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -16,6 +16,8 @@
 """
 This module contains a sqoop 1 operator
 """
+import os
+import signal
 
 from airflow.contrib.hooks.sqoop_hook import SqoopHook
 from airflow.exceptions import AirflowException
@@ -148,22 +150,24 @@ class SqoopOperator(BaseOperator):
         self.hcatalog_table = hcatalog_table
         self.create_hcatalog_table = create_hcatalog_table
         self.properties = properties
-        self.extra_import_options = extra_import_options
-        self.extra_export_options = extra_export_options
+        self.extra_import_options = extra_import_options or {}
+        self.extra_export_options = extra_export_options or {}
 
     def execute(self, context):
         """
         Execute sqoop job
         """
-        hook = SqoopHook(conn_id=self.conn_id,
-                         verbose=self.verbose,
-                         num_mappers=self.num_mappers,
-                         hcatalog_database=self.hcatalog_database,
-                         hcatalog_table=self.hcatalog_table,
-                         properties=self.properties)
+        self.hook = SqoopHook(
+            conn_id=self.conn_id,
+            verbose=self.verbose,
+            num_mappers=self.num_mappers,
+            hcatalog_database=self.hcatalog_database,
+            hcatalog_table=self.hcatalog_table,
+            properties=self.properties
+        )
 
         if self.cmd_type == 'export':
-            hook.export_table(
+            self.hook.export_table(
                 table=self.table,
                 export_dir=self.export_dir,
                 input_null_string=self.input_null_string,
@@ -185,10 +189,12 @@ class SqoopOperator(BaseOperator):
                 self.extra_import_options['create-hcatalog-table'] = ''
 
             if self.table and self.query:
-                raise AirflowException('Cannot specify query and table together. Need to specify either or.')
+                raise AirflowException(
+                    'Cannot specify query and table together. Need to specify either or.'
+                )
 
             if self.table:
-                hook.import_table(
+                self.hook.import_table(
                     table=self.table,
                     target_dir=self.target_dir,
                     append=self.append,
@@ -200,7 +206,7 @@ class SqoopOperator(BaseOperator):
                     driver=self.driver,
                     extra_import_options=self.extra_import_options)
             elif self.query:
-                hook.import_query(
+                self.hook.import_query(
                     query=self.query,
                     target_dir=self.target_dir,
                     append=self.append,
@@ -215,3 +221,7 @@ class SqoopOperator(BaseOperator):
                 )
         else:
             raise AirflowException("cmd_type should be 'import' or 'export'")
+
+    def on_kill(self):
+        self.log.info('Sending SIGTERM signal to bash process group')
+        os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)