You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/07 06:10:52 UTC

[GitHub] [airflow] nullhack opened a new pull request #9165: Fix DockerOperator xcom

nullhack opened a new pull request #9165:
URL: https://github.com/apache/airflow/pull/9165


   Solves the issue https://github.com/apache/airflow/issues/9164
   
   **File changed**: `airflow/providers/docker/operators/docker.py`
   
   **Problems**
   
   * **Issue 1**: When `xcom_push=True` is enabled (and `xcom_push_all=False`), the output **sometimes is null** OR captured in wrongly.
   * **Issue 2**: When `xcom_push_all=True` a bytes string (`b'...'`) is stored as xcom, It's harder to use the output on following operators and do not conform with other operators.
   * **Issue 3**: `Stderr` and `stdout` are written to the same output xcom. In practice, we don't want warnings and errors messing up with the code to be parsed on following operators (But we need to capture the output on airflow logs). Sending `stderr` to xcom can lead to undefined/non-deterministic behavior.
   
   **Solutions**:
   
   * **Issue 1**: The issue here is that the [attach](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L241) starts before the [wait](https://github.com/apache/airflow/blob/master/airflow/providers/docker/operators/docker.py#L255). But seems there's an issue if we try to run `wait` before `attach` using `stream=True` that freezes and don't return any output (I did not dig deep, but probably will become an issue on `docker python SDK` later). A simple solution is to run `logs` as It's a [wrapper for attach](https://github.com/docker/docker-py/blob/master/docker/api/container.py#L22). I followed this way and refactored part of the code, instead of two calls I saved the output on a list after `wait`:
   
   ```python
               self.cli.start(self.container["Id"])
   
               result = self.cli.wait(self.container["Id"])
   
               lines = [
                   l.decode("utf-8").strip() if hasattr(l, "decode") else str(l)
                   for l in self.cli.logs(
                       self.container["Id"], stream=True, stdout=True, stderr=False
                   )
               ]
   ```
   Why a list, not a generator?
   Because I might use two times, a generator can be used only once per request:
   ```python
               for line in lines:
                   self.log.info(line)
      
               ...
   
               if self.xcom_push_flag:
                   return "\n".join(lines) if self.xcom_all else line
   ```
   So I decided to just keep as a list to make the code less complex. And do not repeat the request as the original code.
   * **Issue 2**:  I just added:
   ```python
   ...
   l.decode("utf-8").strip() if hasattr(l, "decode") else str(l)
   ...
   ```
   This will ensure that every line will be decoded if possible, if not, then return the string representation of It (DockerOperator always return bytes string, but if in the future other objects are possible, this keep It consistent against errors).
   **I expect that this part can cause some discussion**, because I'm changing the output from `bytes` to `string` (my personal opinion is that being `bytes` is an issue and should be changed to `string` as BashOperator). There's no documentation about what should be the value stored as xcom from DockerOperator, but If this is a big issue I suggest creating a flag (`xcom_string=False`), but personally I don't like this approach.
   * **Issue 3**:  I created a separated list that only stores `stderr`:
   ```python
               warnings = [
                   w.decode("utf-8").strip() if hasattr(w, "decode") else str(w)
                   for w in self.cli.logs(
                       self.container["Id"], stream=True, stdout=False, stderr=True
                   )
               ]
   
               for warning in warnings:
                   self.log.warning(warning)
   ```
   In the end, only logs are send to xcom and if any error is raised, the DAG run fail:
   ```python
               if result["StatusCode"] != 0:
                   raise AirflowException("docker container failed: " + repr(result))
   
               if self.xcom_push_flag:
                   return "\n".join(lines) if self.xcom_all else line
   ```
   
   **Sample Output**
   
   * **Issue 1**:
     * Captured correctly:
   ![image](https://user-images.githubusercontent.com/11466701/83961479-1060ed00-a8c6-11ea-9fb8-860c8b60107b.png)
   `We would expect only the last line '9'`
   * **Issue 2**:
     * Conform with similar operators
   ![image](https://user-images.githubusercontent.com/11466701/83961584-1e633d80-a8c7-11ea-9fd6-783b26ffa1a5.png)
   
     * Easier to pull outputs and use on other operators
   ![image](https://user-images.githubusercontent.com/11466701/83961601-3dfa6600-a8c7-11ea-805c-a2471887eaac.png)
   
   
   * **Issue 3**:
     * `stderr` do not mess up with the output
   ![image](https://user-images.githubusercontent.com/11466701/83961626-89ad0f80-a8c7-11ea-9cba-bd64f81fe34a.png)
   `Note that new lines are not shown on UI, but if used on some operator the output will be correct`
     * deterministic behavior
   ![image](https://user-images.githubusercontent.com/11466701/83961661-ead4e300-a8c7-11ea-8b7e-59a95eb57c92.png)
   
   ---
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes) `unnecessary`
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions. `unnecessary`
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack edited a comment on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack edited a comment on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640226947


   @mik-laj Thank you for the suggestions. I wrote some example on the issue Itself (https://github.com/apache/airflow/issues/9164) if you need It to test now.
   
   I'll write the example as you asked. But I would prefer to do It after some agreement is reached if the current code is acceptable or needs changes to avoid modifying the example every time a change is required.
   
   About the `KubernetesPodOperator`, I haven't use It, my workflow is based on docker swarm. Can you describe the difference in behavior you mentioned earlier?
   
   Apart from that, there's one *check* that were not successful. I tried to read It, but don't seems related to my development (at least directly). Can you (or any other member reading It) confirm if It?
   **Edit**: I can confirm this test failed is not part of this PR, We can see that other [PR's](https://github.com/apache/airflow/pull/9174) with no related code are failing on the same test `CI Build / Quarantined:Pg9.6,Py3.6 (pull_request) `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640182089


   @retornam  @CodingJonas  @CaptainCuddleCube @nalepae @akki  Can I ask for review? This change looks valuable, but requires expert knowledge about the Docker/Docker API.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640481723


   > In kubernetes, you must create a file and this file will be saved to xcom.
   
   Yes, the current DockerOperator is different. It reads logs from `self.client.logs` instead. I think somewhere in between would be the ideal. The return is based on files, but the default file returned is `stdout`.
   
   > ```
   > mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
   > ```
   > 
   > In this way, the logs can still contain useful information for the developer.
   
   Yes, I like this approach. One problem I see is that `/airflow/xcom/return.json` is [fixed](https://github.com/apache/airflow/blob/master/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L136). I think if the path could be changed (default `stdout`) would be more flexible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640442214


   @nullhack In cases like this (where you are opening a PR) creating the issue before hand is entierly optional -- you can just open a PR and describe the bug in the PR message; no issue needed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack closed pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack closed pull request #9165:
URL: https://github.com/apache/airflow/pull/9165


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640469512


   > @nullhack In cases like this (where you are opening a PR) creating the issue before hand is entierly optional -- you can just open a PR and describe the bug in the PR message; no issue needed.
   
   Noted. I didn't know about that, Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-647116088


   Thank you @mik-laj I was trying to find a solution for this. Will use the reflog


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack edited a comment on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack edited a comment on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640226947


   @mik-laj Thank you for the suggestions. I wrote some example on the issue Itself (https://github.com/apache/airflow/issues/9164) if you need It to test now.
   
   I'll write the example as you asked. But I would prefer to do It after some agreement is reached if the current code is acceptable or needs changes to avoid modifying the example every time a change is required.
   
   About the `KubernetesPodOperator`, I haven't use It, my workflow is based on docker swarm. Can you describe the difference in behavior you mentioned earlier?
   
   Apart from that, there's one *check* that were not successful. I tried to read It, but don't seems related to my development (at least directly). Can you (or any other member reading It) confirm if It?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akki commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
akki commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-643606357


   @mik-laj Thanks for drawing my attention to this.
   @nullhack I have commented some questions/suggestions for these changes, hope they make sense to you and are useful.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640427194


   > About the KubernetesPodOperator, I haven't use It, my workflow is based on docker swarm. Can you describe the difference in behavior you mentioned earlier?
   
   In kubernetes, you must create a file and this file will be saved to xcom.
   ```
   mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
   ```
   In this way, the logs can still contain useful information for the developer.
   
   > Apart from that, there's one check that were not successful. I tried to read It, but don't seems related to my development (at least directly). Can you (or any other member reading It) confirm if It?
   
   Quarantine tests are in quarantine because they are flaky. Please ignore it. We want to fix them soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nullhack commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
nullhack commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640226947


   @mik-laj Thank you for the suggestions. I wrote some example on the issue Itself (https://github.com/apache/airflow/issues/9164) if you need It to test now.
   
   I'll write the example as you asked. But I would prefer to do It after some agreement is reached if the current code is acceptable or needs changes to avoid modifying the example every time a change is required.
   
   About the `KubernetesPodOperator`, I haven't use It, my workflow is based on docker swarm. Can you describe the difference in behavior you mentioned earlier?
   
   Apart from that, there's one *check* that were not successful. I tried to read It, but don't seems related to my development (at least directly). Can you (or any other member reading It) confirm if It?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akki commented on a change in pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
akki commented on a change in pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#discussion_r439729529



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],
-                                    stdout=True,
-                                    stderr=True,
-                                    stream=True)
+            self.cli.start(self.container["Id"])
 
-            self.cli.start(self.container['Id'])
+            result = self.cli.wait(self.container["Id"])
+
+            lines = [
+                templ.decode("utf-8").strip() if hasattr(templ, "decode") else str(templ)
+                for templ in self.cli.logs(
+                    self.container["Id"], stream=True, stdout=True, stderr=False
+                )
+            ]
 
             line = ''
             for line in lines:
-                line = line.strip()
-                if hasattr(line, 'decode'):
-                    line = line.decode('utf-8')
                 self.log.info(line)
 
-            result = self.cli.wait(self.container['Id'])
-            if result['StatusCode'] != 0:
-                raise AirflowException('docker container failed: ' + repr(result))
+            warnings = [
+                tempw.decode("utf-8").strip() if hasattr(tempw, "decode") else str(tempw)
+                for tempw in self.cli.logs(
+                    self.container["Id"], stream=True, stdout=False, stderr=True
+                )
+            ]
+
+            for warning in warnings:
+                self.log.warning(warning)
+
+            if result["StatusCode"] != 0:
+                raise AirflowException("docker container failed: " + repr(result))
 
-            # duplicated conditional logic because of expensive operation
             ret = None
             if self.do_xcom_push:
-                ret = self.cli.logs(container=self.container['Id']) \
-                    if self.xcom_all else line.encode('utf-8')
+                ret = "\n".join(lines) if self.xcom_all else line
 
+            self.cli.stop(self.container['Id'])

Review comment:
       I have a couple of questions here:
     1. Why does Airflow explicitly need to stop the Docker container?
     2. If I understand correctly the [`.wait()` API](https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.wait) (which we are already calling above) ensures that the container is stopped by blocking until the container gets stopped so why would the container be running at this point of time (hence why the need for this call?).
   
   If there is some reason that justifies this call, I would suggest adding some (warn?) logging here telling the user that the container was forcefully stopped by Airflow, so that it is easy for them to debug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-640212046


   @nullhack Can you add an example of how to use DockerOperator and XCOM? 
   https://github.com/apache/airflow/blob/master/airflow/providers/docker/example_dags/example_docker.py
   If I understand correctly, this behavior is different from KubernetesPodOperator? 
   https://github.com/apache/airflow/blob/34d1441f0a0eae32a398d85ae95191f7a074e367/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py#L67-L77


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akki commented on a change in pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
akki commented on a change in pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#discussion_r439728911



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],
-                                    stdout=True,
-                                    stderr=True,
-                                    stream=True)
+            self.cli.start(self.container["Id"])
 
-            self.cli.start(self.container['Id'])
+            result = self.cli.wait(self.container["Id"])

Review comment:
       Please correct me if I am wrong but If we `.wait()` here, I understand that Airflow won't show up the application logs until the Docker container stops?
   
   For long-running applications (which typically run for 1 or more hours), I think it is very crucial to see their logs in near-real-time but with this, I think we won't see any logs until the container process completes? Sorry if I am missing something here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akki commented on a change in pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
akki commented on a change in pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#discussion_r439728527



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],

Review comment:
       I think the [`self.cli.attach` method](https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.attach) with `stream=True` returns a generator which allows you to read the logs in a "streaming" fashion without using significant memory.

##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],
-                                    stdout=True,
-                                    stderr=True,
-                                    stream=True)
+            self.cli.start(self.container["Id"])
 
-            self.cli.start(self.container['Id'])
+            result = self.cli.wait(self.container["Id"])
+
+            lines = [
+                templ.decode("utf-8").strip() if hasattr(templ, "decode") else str(templ)
+                for templ in self.cli.logs(
+                    self.container["Id"], stream=True, stdout=True, stderr=False
+                )
+            ]

Review comment:
       So this will load the full logs of the application into memory, right? So if the application log file is 1 GB in size Airflow will start using 1GB extra RAM?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#issuecomment-647115924


   @nullhack  Could you do rebase instead of merging changes next time? This allows us to see all your changes in the "Commits" tab.  It makes reviews easier. 
   Here is a guide about it: https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id8
   If you want, you can now undo your merge using git reflog and try to do rebase.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] akki commented on a change in pull request #9165: Fix DockerOperator xcom

Posted by GitBox <gi...@apache.org>.
akki commented on a change in pull request #9165:
URL: https://github.com/apache/airflow/pull/9165#discussion_r439730550



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -243,30 +245,39 @@ def _run_image(self):
                 tty=self.tty,
             )
 
-            lines = self.cli.attach(container=self.container['Id'],

Review comment:
       > So I decided to just keep as a list to make the code less complex. And do not repeat the request as the original code.
   
   I do read this in the description. I am guessing the original code makes the repeated request because of the tiny memory footprint of this API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org