You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/28 02:44:25 UTC
[GitHub] [airflow] ferruzzi opened a new pull request #21175: Fixes Docker xcom functionality
ferruzzi opened a new pull request #21175:
URL: https://github.com/apache/airflow/pull/21175
closes: https://github.com/apache/airflow/pull/19027
completes: https://github.com/apache/airflow/pull/19027/
Co-Author: https://github.com/asaf400
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796050520
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
Review comment:
Added in https://github.com/apache/airflow/pull/21175/commits/12df34c1d94e0c9d7fed44784d594109b3a1820d
Just making sure I understand, adding the `surrogateescape` here allows our decode to pick up and fix anything that was escaped in the initial encode, assuming they used the same `surrogateescape` there? Is that right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024857545
See https://github.com/apache/airflow/pull/19027#discussion_r795019065. I still think having a separate API call is safer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1026507529
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795090462
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
Review comment:
I haven't ruin into that before. I'll read up on it and get the change in on Monday.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024525090
Rebased and corrected the co-author in the commit message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796046005
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
+ log_chunk = log_chunk.strip()
+ log_lines.append(log_chunk)
+ self.log.info("%s", log_chunk)
+
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
- res_lines = "\n".join(res_lines)
- raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
- if self.retrieve_output and not return_value:
- return_value = self._attempt_to_retrieve_result()
- ret = None
+ log_lines = "\n".join(log_lines)
+ raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')
Review comment:
Thanks. For the future, what didn't it like there? Reusing the same variable name?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1025472644
Ah right, I misread, sorry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1024979992
> See [#19027 (comment)](https://github.com/apache/airflow/pull/19027#discussion_r795019065). I still think having a separate API call is safer.
Isn't this what you were asking for with that discussion? https://github.com/apache/airflow/pull/21175/files#diff-5ec82e1fcaaa38e04471f56b64ec319f67b146d3a18d8bdb69c2f973ea3dd585R306-R311
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#issuecomment-1023834665
@uranusjr - You were looking at someone else's attempt at this, which has been sitting for a while. (link in the description)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal merged pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
eladkal merged pull request #21175:
URL: https://github.com/apache/airflow/pull/21175
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r796285197
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
+ log_chunk = log_chunk.strip()
+ log_lines.append(log_chunk)
+ self.log.info("%s", log_chunk)
+
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
- res_lines = "\n".join(res_lines)
- raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
- if self.retrieve_output and not return_value:
- return_value = self._attempt_to_retrieve_result()
- ret = None
+ log_lines = "\n".join(log_lines)
+ raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')
Review comment:
Yeah from previous lines Mypy sets the type of `log_lines` to `List[str]`, so assigning `str` to it raises an error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795019262
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
Review comment:
We probably should have `error="surrogateescape"` here (something like this, I don’t remember the exact name) because there’s no guaratee the log is outputting valid strings, and those characters shouldn’t crash the entire job.
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
+ log_chunk = log_chunk.strip()
+ log_lines.append(log_chunk)
+ self.log.info(log_chunk)
Review comment:
```suggestion
self.log.info("%s", log_chunk)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795419744
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
+ log_chunk = log_chunk.strip()
+ log_lines.append(log_chunk)
+ self.log.info("%s", log_chunk)
+
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
- res_lines = "\n".join(res_lines)
- raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
- if self.retrieve_output and not return_value:
- return_value = self._attempt_to_retrieve_result()
- ret = None
+ log_lines = "\n".join(log_lines)
+ raise AirflowException(f'Docker container failed: {repr(result)} lines {log_lines}')
Review comment:
```suggestion
joined_log_lines = "\n".join(log_lines)
raise AirflowException(f'Docker container failed: {repr(result)} lines {joined_log_lines}')
```
to satisfy Mypy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi commented on a change in pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #21175:
URL: https://github.com/apache/airflow/pull/21175#discussion_r795090462
##########
File path: airflow/providers/docker/operators/docker.py
##########
@@ -275,32 +275,40 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio
working_dir=self.working_dir,
tty=self.tty,
)
- lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
+ logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])
- line = ''
- res_lines = []
- return_value = None
- for line in lines:
- if hasattr(line, 'decode'):
+ log_lines = []
+ for log_chunk in logstream:
+ if hasattr(log_chunk, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
- line = line.decode('utf-8')
- line = line.strip()
- res_lines.append(line)
- self.log.info(line)
+ log_chunk = log_chunk.decode('utf-8')
Review comment:
I haven't run into that before. I'll read up on it and get the change in on Monday.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ferruzzi closed pull request #21175: Fixes Docker xcom functionality
Posted by GitBox <gi...@apache.org>.
ferruzzi closed pull request #21175:
URL: https://github.com/apache/airflow/pull/21175
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org