You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/25 21:38:56 UTC
incubator-airflow git commit: [AIRFLOW-2006] Add local log catching
to kubernetes operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 3dbfdafd7 -> 55f267492
[AIRFLOW-2006] Add local log catching to kubernetes operator
Closes #2947 from dimberman/AIRFLOW-2006
-kubernetes-log-aggregation
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/55f26749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/55f26749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/55f26749
Branch: refs/heads/master
Commit: 55f2674925d391e4f3930b24ca65d2cb65fc25c3
Parents: 3dbfdaf
Author: Daniel Imberman <da...@gmail.com>
Authored: Thu Jan 25 22:37:50 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jan 25 22:37:52 2018 +0100
----------------------------------------------------------------------
airflow/contrib/kubernetes/pod_launcher.py | 22 +++++++---
.../operators/kubernetes_pod_operator.py | 8 +++-
.../example_dags/example_kubernetes_operator.py | 42 ++++++++++++++++++++
.../test_kubernetes_pod_operator.py | 18 ++++++++-
tests/core.py | 2 +-
tests/jobs.py | 1 +
6 files changed, 83 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index 51f443b..a765986 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -54,7 +54,7 @@ class PodLauncher(LoggingMixin):
raise
return resp
- def run_pod(self, pod, startup_timeout=120):
+ def run_pod(self, pod, startup_timeout=120, get_logs=True):
# type: (Pod) -> State
"""
Launches the pod synchronously and waits for completion.
@@ -74,15 +74,25 @@ class PodLauncher(LoggingMixin):
time.sleep(1)
self.log.debug('Pod not yet started')
- final_status = self._monitor_pod(pod)
+ final_status = self._monitor_pod(pod, get_logs)
return final_status
- def _monitor_pod(self, pod):
+ def _monitor_pod(self, pod, get_logs):
# type: (Pod) -> State
- while self.pod_is_running(pod):
- self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
- time.sleep(2)
+ if get_logs:
+ logs = self._client.read_namespaced_pod_log(
+ name=pod.name,
+ namespace=pod.namespace,
+ follow=True,
+ tail_lines=10,
+ _preload_content=False)
+ for line in logs:
+ self.log.info(line)
+ else:
+ while self.pod_is_running(pod):
+ self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
+ time.sleep(2)
return self._task_status(self.read_pod(pod))
def _task_status(self, event):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 5d03875..82dfa52 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -43,12 +43,16 @@ class KubernetesPodOperator(BaseOperator):
)
launcher = pod_launcher.PodLauncher(client)
- final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
+ final_state = launcher.run_pod(
+ pod,
+ startup_timeout=self.startup_timeout_seconds,
+ get_logs=self.get_logs)
if final_state != State.SUCCESS:
raise AirflowException("Pod returned a failure")
except AirflowException as ex:
raise AirflowException("Pod Launching failed: {error}".format(error=ex))
+
@apply_defaults
def __init__(self,
namespace,
@@ -60,6 +64,7 @@ class KubernetesPodOperator(BaseOperator):
labels=None,
startup_timeout_seconds=120,
kube_executor_config=None,
+ get_logs=True,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -72,3 +77,4 @@ class KubernetesPodOperator(BaseOperator):
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
self.in_cluster = in_cluster
+ self.get_logs = get_logs
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/example_dags/example_kubernetes_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_kubernetes_operator.py b/airflow/example_dags/example_kubernetes_operator.py
new file mode 100644
index 0000000..9b86321
--- /dev/null
+++ b/airflow/example_dags/example_kubernetes_operator.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import airflow
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow.models import DAG
+
+args = {
+ 'owner': 'airflow',
+ 'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+ dag_id='example_kubernetes_operator',
+ default_args=args,
+ schedule_interval=None)
+
+k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="airflow-test-pod",
+ in_cluster=False,
+ task_id="task",
+ get_logs=True,
+ dag=dag
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index 4bbde8f..321f01f 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,7 +19,8 @@ import unittest
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import AirflowException
from subprocess import check_call
-
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
try:
check_call(["kubectl", "get", "pods"])
@@ -31,7 +32,6 @@ except Exception as e:
class KubernetesPodOperatorTest(unittest.TestCase):
-
def test_working_pod(self):
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
@@ -44,6 +44,20 @@ class KubernetesPodOperatorTest(unittest.TestCase):
k.execute(None)
+ def test_logging(self):
+ with mock.patch.object(PodLauncher, 'log') as mock_logger:
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ get_logs=True
+ )
+ k.execute(None)
+ mock_logger.info.assert_any_call("+ echo\n")
+
def test_faulty_image(self):
bad_image_name = "foobar"
k = KubernetesPodOperator(namespace='default',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index f25d0e7..b79c1ff 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -66,7 +66,7 @@ from jinja2 import UndefinedError
import six
-NUM_EXAMPLE_DAGS = 18
+NUM_EXAMPLE_DAGS = 19
DEV_NULL = '/dev/null'
TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e522de5..aa78721 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -156,6 +156,7 @@ class BackfillJobTest(unittest.TestCase):
'example_trigger_target_dag',
'example_trigger_controller_dag', # tested above
'test_utils', # sleeps forever
+ 'example_kubernetes_operator', # only works with k8s cluster
]
logger = logging.getLogger('BackfillJobTest.test_backfill_examples')