You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 14:08:36 UTC
incubator-airflow git commit: [AIRFLOW-1995][Airflow 1995] add
on_kill method to SqoopOperator
Repository: incubator-airflow
Updated Branches:
refs/heads/master eb994d683 -> 147472b99
[AIRFLOW-1995][Airflow 1995] add on_kill method to SqoopOperator
Closes #2936 from Acehaidrey/AIRFLOW-1995
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/147472b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/147472b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/147472b9
Branch: refs/heads/master
Commit: 147472b99babdbc67e63f784968864703e168562
Parents: eb994d6
Author: Ace Haidrey <ah...@pandora.com>
Authored: Fri Jan 12 15:08:29 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 15:08:29 2018 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/sqoop_hook.py | 15 ++++++-----
airflow/contrib/operators/sqoop_operator.py | 34 +++++++++++++++---------
2 files changed, 30 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 578b527..102b599 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -88,21 +88,22 @@ class SqoopHook(BaseHook, LoggingMixin):
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
"""
- self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd))))
- sp = subprocess.Popen(cmd,
+ masked_cmd = ' '.join(self.cmd_mask_password(cmd))
+ self.log.info("Executing command: {}".format(masked_cmd))
+ self.sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
- for line in iter(sp.stdout):
+ for line in iter(self.sp.stdout):
self.log.info(line.strip())
- sp.wait()
+ self.sp.wait()
- self.log.info("Command exited with return code %s", sp.returncode)
+ self.log.info("Command exited with return code %s", self.sp.returncode)
- if sp.returncode:
- raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd))))
+ if self.sp.returncode:
+ raise AirflowException("Sqoop command failed: {}".format(masked_cmd))
def _prepare_command(self, export=False):
sqoop_cmd_type = "export" if export else "import"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index cdaf336..cc0d646 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -16,6 +16,8 @@
"""
This module contains a sqoop 1 operator
"""
+import os
+import signal
from airflow.contrib.hooks.sqoop_hook import SqoopHook
from airflow.exceptions import AirflowException
@@ -148,22 +150,24 @@ class SqoopOperator(BaseOperator):
self.hcatalog_table = hcatalog_table
self.create_hcatalog_table = create_hcatalog_table
self.properties = properties
- self.extra_import_options = extra_import_options
- self.extra_export_options = extra_export_options
+ self.extra_import_options = extra_import_options or {}
+ self.extra_export_options = extra_export_options or {}
def execute(self, context):
"""
Execute sqoop job
"""
- hook = SqoopHook(conn_id=self.conn_id,
- verbose=self.verbose,
- num_mappers=self.num_mappers,
- hcatalog_database=self.hcatalog_database,
- hcatalog_table=self.hcatalog_table,
- properties=self.properties)
+ self.hook = SqoopHook(
+ conn_id=self.conn_id,
+ verbose=self.verbose,
+ num_mappers=self.num_mappers,
+ hcatalog_database=self.hcatalog_database,
+ hcatalog_table=self.hcatalog_table,
+ properties=self.properties
+ )
if self.cmd_type == 'export':
- hook.export_table(
+ self.hook.export_table(
table=self.table,
export_dir=self.export_dir,
input_null_string=self.input_null_string,
@@ -185,10 +189,12 @@ class SqoopOperator(BaseOperator):
self.extra_import_options['create-hcatalog-table'] = ''
if self.table and self.query:
- raise AirflowException('Cannot specify query and table together. Need to specify either or.')
+ raise AirflowException(
+ 'Cannot specify query and table together. Need to specify either or.'
+ )
if self.table:
- hook.import_table(
+ self.hook.import_table(
table=self.table,
target_dir=self.target_dir,
append=self.append,
@@ -200,7 +206,7 @@ class SqoopOperator(BaseOperator):
driver=self.driver,
extra_import_options=self.extra_import_options)
elif self.query:
- hook.import_query(
+ self.hook.import_query(
query=self.query,
target_dir=self.target_dir,
append=self.append,
@@ -215,3 +221,7 @@ class SqoopOperator(BaseOperator):
)
else:
raise AirflowException("cmd_type should be 'import' or 'export'")
+
+ def on_kill(self):
+ self.log.info('Sending SIGTERM signal to bash process group')
+ os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)