You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/25 21:38:56 UTC

incubator-airflow git commit: [AIRFLOW-2006] Add local log catching to kubernetes operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3dbfdafd7 -> 55f267492


[AIRFLOW-2006] Add local log catching to kubernetes operator

Closes #2947 from dimberman/AIRFLOW-2006
-kubernetes-log-aggregation


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/55f26749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/55f26749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/55f26749

Branch: refs/heads/master
Commit: 55f2674925d391e4f3930b24ca65d2cb65fc25c3
Parents: 3dbfdaf
Author: Daniel Imberman <da...@gmail.com>
Authored: Thu Jan 25 22:37:50 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jan 25 22:37:52 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/kubernetes/pod_launcher.py      | 22 +++++++---
 .../operators/kubernetes_pod_operator.py        |  8 +++-
 .../example_dags/example_kubernetes_operator.py | 42 ++++++++++++++++++++
 .../test_kubernetes_pod_operator.py             | 18 ++++++++-
 tests/core.py                                   |  2 +-
 tests/jobs.py                                   |  1 +
 6 files changed, 83 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index 51f443b..a765986 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -54,7 +54,7 @@ class PodLauncher(LoggingMixin):
             raise
         return resp
 
-    def run_pod(self, pod, startup_timeout=120):
+    def run_pod(self, pod, startup_timeout=120, get_logs=True):
         # type: (Pod) -> State
         """
         Launches the pod synchronously and waits for completion.
@@ -74,15 +74,25 @@ class PodLauncher(LoggingMixin):
                 time.sleep(1)
             self.log.debug('Pod not yet started')
 
-        final_status = self._monitor_pod(pod)
+        final_status = self._monitor_pod(pod, get_logs)
         return final_status
 
-    def _monitor_pod(self, pod):
+    def _monitor_pod(self, pod, get_logs):
         # type: (Pod) -> State
 
-        while self.pod_is_running(pod):
-            self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
-            time.sleep(2)
+        if get_logs:
+            logs = self._client.read_namespaced_pod_log(
+                name=pod.name,
+                namespace=pod.namespace,
+                follow=True,
+                tail_lines=10,
+                _preload_content=False)
+            for line in logs:
+                self.log.info(line)
+        else:
+            while self.pod_is_running(pod):
+                self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
+                time.sleep(2)
         return self._task_status(self.read_pod(pod))
 
     def _task_status(self, event):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 5d03875..82dfa52 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -43,12 +43,16 @@ class KubernetesPodOperator(BaseOperator):
                                )
 
             launcher = pod_launcher.PodLauncher(client)
-            final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
+            final_state = launcher.run_pod(
+                pod,
+                startup_timeout=self.startup_timeout_seconds,
+                get_logs=self.get_logs)
             if final_state != State.SUCCESS:
                 raise AirflowException("Pod returned a failure")
         except AirflowException as ex:
             raise AirflowException("Pod Launching failed: {error}".format(error=ex))
 
+
     @apply_defaults
     def __init__(self,
                  namespace,
@@ -60,6 +64,7 @@ class KubernetesPodOperator(BaseOperator):
                  labels=None,
                  startup_timeout_seconds=120,
                  kube_executor_config=None,
+                 get_logs=True,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -72,3 +77,4 @@ class KubernetesPodOperator(BaseOperator):
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name
         self.in_cluster = in_cluster
+        self.get_logs = get_logs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/airflow/example_dags/example_kubernetes_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_kubernetes_operator.py b/airflow/example_dags/example_kubernetes_operator.py
new file mode 100644
index 0000000..9b86321
--- /dev/null
+++ b/airflow/example_dags/example_kubernetes_operator.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import airflow
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow.models import DAG
+
+args = {
+    'owner': 'airflow',
+    'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+    dag_id='example_kubernetes_operator',
+    default_args=args,
+    schedule_interval=None)
+
+k = KubernetesPodOperator(namespace='default',
+                          image="ubuntu:16.04",
+                          cmds=["bash", "-cx"],
+                          arguments=["echo", "10"],
+                          labels={"foo": "bar"},
+                          name="airflow-test-pod",
+                          in_cluster=False,
+                          task_id="task",
+                          get_logs=True,
+                          dag=dag
+                          )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index 4bbde8f..321f01f 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,7 +19,8 @@ import unittest
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow import AirflowException
 from subprocess import check_call
-
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 
 try:
     check_call(["kubectl", "get", "pods"])
@@ -31,7 +32,6 @@ except Exception as e:
 
 
 class KubernetesPodOperatorTest(unittest.TestCase):
-
     def test_working_pod(self):
         k = KubernetesPodOperator(namespace='default',
                                   image="ubuntu:16.04",
@@ -44,6 +44,20 @@ class KubernetesPodOperatorTest(unittest.TestCase):
 
         k.execute(None)
 
+    def test_logging(self):
+        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+            k = KubernetesPodOperator(namespace='default',
+                                      image="ubuntu:16.04",
+                                      cmds=["bash", "-cx"],
+                                      arguments=["echo", "10"],
+                                      labels={"foo": "bar"},
+                                      name="test",
+                                      task_id="task",
+                                      get_logs=True
+                                      )
+            k.execute(None)
+            mock_logger.info.assert_any_call("+ echo\n")
+
     def test_faulty_image(self):
         bad_image_name = "foobar"
         k = KubernetesPodOperator(namespace='default',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index f25d0e7..b79c1ff 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -66,7 +66,7 @@ from jinja2 import UndefinedError
 
 import six
 
-NUM_EXAMPLE_DAGS = 18
+NUM_EXAMPLE_DAGS = 19
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/55f26749/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e522de5..aa78721 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -156,6 +156,7 @@ class BackfillJobTest(unittest.TestCase):
             'example_trigger_target_dag',
             'example_trigger_controller_dag',  # tested above
             'test_utils',  # sleeps forever
+            'example_kubernetes_operator',  # only works with k8s cluster
         ]
 
         logger = logging.getLogger('BackfillJobTest.test_backfill_examples')