You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:02:23 UTC
[27/50] incubator-airflow git commit: [AIRFLOW-2363] Fix return type
bug in TaskHandler
[AIRFLOW-2363] Fix return type bug in TaskHandler
Closes #3259 from
yrqls21/kevin_yang_fix_s3_logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/19b39012
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/19b39012
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/19b39012
Branch: refs/heads/v1-10-test
Commit: 19b3901284b9f7a9f9a898dd1a1e823e5109cfa1
Parents: 0ff434a
Author: Kevin Yang <ke...@airbnb.com>
Authored: Mon Apr 30 12:49:06 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Apr 30 12:49:20 2018 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 1 +
airflow/utils/log/gcs_task_handler.py | 11 ++++++-----
airflow/utils/log/s3_task_handler.py | 9 ++++-----
airflow/utils/log/wasb_task_handler.py | 9 ++++-----
tests/utils/log/test_s3_task_handler.py | 12 ++++++++++--
5 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8a92cfa..f26cbe4 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -468,6 +468,7 @@ def run(args, dag=None):
else:
with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
_run(args, dag, ti)
+ logging.shutdown()
@cli_utils.action_logging
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index d4a9871..8c34792 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -113,13 +113,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.gcs_read(remote_loc)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
except Exception as e:
log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
remote_loc, str(e))
self.log.error(log)
- log += super(GCSTaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ local_log, metadata = super(GCSTaskHandler, self)._read(ti, try_number)
+ log += local_log
+ return log, metadata
def gcs_read(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index f29a92f..07b9b3e 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -111,10 +111,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.s3_read(remote_loc, return_error=True)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
else:
- log = super(S3TaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ return super(S3TaskHandler, self)._read(ti, try_number)
def s3_log_exists(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py
index 1a9590d..a2a0c0d 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -119,10 +119,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
remote_log = self.wasb_read(remote_loc, return_error=True)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
+ return log, {'end_of_log': True}
else:
- log = super(WasbTaskHandler, self)._read(ti, try_number)
-
- return log, {'end_of_log': True}
+ return super(WasbTaskHandler, self)._read(ti, try_number)
def wasb_log_exists(self, remote_log_location):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19b39012/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index c287fbc..a5d5f15 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -117,6 +117,14 @@ class TestS3TaskHandler(unittest.TestCase):
'Log line\n\n'], [{'end_of_log': True}])
)
+ def test_read_when_s3_log_missing(self):
+ log, metadata = self.s3_task_handler.read(self.ti)
+
+ self.assertEqual(1, len(log))
+ self.assertEqual(len(log), len(metadata))
+ self.assertIn('*** Log file does not exist:', log[0])
+ self.assertEqual({'end_of_log': True}, metadata[0])
+
def test_read_raises_return_error(self):
handler = self.s3_task_handler
url = 's3://nonexistentbucket/foo'