You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/28 02:44:25 UTC

[GitHub] [airflow] ferruzzi opened a new pull request #21175: Fixes Docker xcom functionality

ferruzzi opened a new pull request #21175:
URL: https://github.com/apache/airflow/pull/21175


   closes: https://github.com/apache/airflow/pull/19027
   
   completes: https://github.com/apache/airflow/pull/19027/
   
   Co-Author: https://github.com/asaf400
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796050520



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')

Review comment:
       Added in https://github.com/apache/airflow/pull/21175/commits/12df34c1d94e0c9d7fed44784d594109b3a1820d
   
   Just making sure I understand, adding the  `surrogateescape` here allows our decode to pick up and fix anything that was escaped in the initial encode, assuming they used the same `surrogateescape` there?  Is that right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024857545


   See https://github.com/apache/airflow/pull/19027#discussion_r795019065. I still think having a separate API call is safer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1026507529


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795090462



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')

Review comment:
       I haven't ruin into that before.  I'll read up on it and get the change in on Monday.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024525090


   Rebased and corrected the co-author in the commit message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796046005



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')
+                log_chunk = log_chunk.strip()
+                log_lines.append(log_chunk)
+                self.log.info("%s", log_chunk)
+
             result = self.cli.wait(self.container['Id'])
             if result['StatusCode'] != 0:
-                res_lines = "\n".join(res_lines)
-                raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
-            if self.retrieve_output and not return_value:
-                return_value = self._attempt_to_retrieve_result()
-            ret = None
+                log_lines = "\n".join(log_lines)
+                raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')

Review comment:
       Thanks.  For the future, what didn't it like there?  Reusing the same variable name?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1025472644


   Ah right, I misread, sorry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024979992


   > See [#19027 (comment)](https://github.com/apache/airflow/pull/19027#discussion_r795019065). I still think having a separate API call is safer.
   
   Isn't this what you were asking for with that discussion?  https://github.com/apache/airflow/pull/21175/files#diff-5ec82e1fcaaa38e04471f56b64ec319f67b146d3a18d8bdb69c2f973ea3dd585R306-R311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1023834665


   @uranusjr - You were looking at someone else's attempt at this, which has been sitting for a while. (link in the description) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal merged pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
eladkal merged pull request #21175:
URL: https://github.com/apache/airflow/pull/21175


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796285197



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')
+                log_chunk = log_chunk.strip()
+                log_lines.append(log_chunk)
+                self.log.info("%s", log_chunk)
+
             result = self.cli.wait(self.container['Id'])
             if result['StatusCode'] != 0:
-                res_lines = "\n".join(res_lines)
-                raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
-            if self.retrieve_output and not return_value:
-                return_value = self._attempt_to_retrieve_result()
-            ret = None
+                log_lines = "\n".join(log_lines)
+                raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')

Review comment:
       Yeah from previous lines Mypy sets the type of `log_lines` to `List[str]`, so assigning `str` to it raises an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795019262



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')

Review comment:
       We probably should have `error="surrogateescape"` here (something like this, I don’t remember the exact name) because there’s no guaratee the log is outputting valid strings, and those characters shouldn’t crash the entire job.

##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')
+                log_chunk = log_chunk.strip()
+                log_lines.append(log_chunk)
+                self.log.info(log_chunk)

Review comment:
       ```suggestion
                   self.log.info("%s", log_chunk)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795419744



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')
+                log_chunk = log_chunk.strip()
+                log_lines.append(log_chunk)
+                self.log.info("%s", log_chunk)
+
             result = self.cli.wait(self.container['Id'])
             if result['StatusCode'] != 0:
-                res_lines = "\n".join(res_lines)
-                raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
-            if self.retrieve_output and not return_value:
-                return_value = self._attempt_to_retrieve_result()
-            ret = None
+                log_lines = "\n".join(log_lines)
+                raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')

Review comment:
       ```suggestion
                   joined_log_lines = "\n".join(log_lines)
                   raise AirflowException(f'Docker container failed: {repr(result)} lines {joined_log_lines}')
   ```
   
   to satisfy Mypy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795090462



##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
             working_dir=self.working_dir,
             tty=self.tty,
         )
-        lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+        logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container['Id'])
 
-            line = ''
-            res_lines = []
-            return_value = None
-            for line in lines:
-                if hasattr(line, 'decode'):
+            log_lines = []
+            for log_chunk in logstream:
+                if hasattr(log_chunk, 'decode'):
                     # Note that lines returned can also be byte sequences so we have to handle decode here
-                    line = line.decode('utf-8')
-                line = line.strip()
-                res_lines.append(line)
-                self.log.info(line)
+                    log_chunk = log_chunk.decode('utf-8')

Review comment:
       I haven't run into that before.  I'll read up on it and get the change in on Monday.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi closed pull request #21175: Fixes Docker xcom functionality

Posted by GitBox <gi...@apache.org>.
ferruzzi closed pull request #21175:
URL: https://github.com/apache/airflow/pull/21175


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org