You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/21 15:16:28 UTC
[GitHub] [airflow] smishra opened a new issue #10451: Airflow tries running Jinja on any property name ending with .json
smishra opened a new issue #10451:
URL: https://github.com/apache/airflow/issues/10451
Looks like there is a bug in Airflow templated operator argument when it has any string that ends with _.json._ Following is my DAG - please notice **"--files", "s3://dummy/spark/application.json"** in the STEPS variable.
`from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from airflow.utils.dates import days_ago
DEFAULT_ARGS = {
'owner': 'Commscope',
'depends_on_past': False,
'email': ['smishra@commscope.com'],
'email_on_failure': False,
'email_on_retry': False
}
JOB_FLOW_OVERRIDES = {
'Name': 'PiCalc',
'ReleaseLabel': 'emr-5.29.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.medium',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
},
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
STEPS = [{
"Name": "Process data",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"--class", "com.dummy.Application",
"--files", "s3://dummy/spark/application.json",
"--driver-java-options",
"-Dlog4j.configuration=log4j.properties",
"--driver-java-options",
"-Dconfig.resource=application.json",
"--driver-java-options"
"s3://dummy/spark/app-jar-with-dependencies.jar",
"application.json"
]
}
}]
with DAG(
dag_id='data_processing',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(2),
schedule_interval='0 3 * * *',
tags=['inquire', 'bronze'],
) as dag:
job_flow_creator = EmrCreateJobFlowOperator(
task_id='launch_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default'
)
job_flow_sensor = EmrJobFlowSensor(
task_id='check_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
target_states=['RUNNING', 'WAITING'],
aws_conn_id='aws_default'
)
proc_step = EmrAddStepsOperator(
task_id='process_data',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=STEPS,
)
job_flow_terminator = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
trigger_rule="all_done"
)
job_flow_creator >> job_flow_sensor >> proc_step >> job_flow_terminator
`
The cluster launches successfully but the **Airflow fails with following error**
`[2020-08-21 15:06:42,307] {taskinstance.py:1145} ERROR - s3://dummy/spark/application.json
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 964, in _run_raw_task
self.render_templates(context=context)
...
...
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: s3://dummy/spark/application.json`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704
> Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
>
> **Source Code**:
>
> https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
>
> This was added by #8572
>
> You can fix these in 2 ways:
>
> 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
> 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
>
> ```python
> class FixedEmrAddStepsOperator(BaseOperator):
> template_ext = ()
> ```
>
> and then you can use this Operator:
>
> ```python
> proc_step = FixedEmrAddStepsOperator(
> task_id='process_data',
> job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
> aws_conn_id='aws_default',
> steps=STEPS,
> )
> ```
------
What should be the typical sub class code.
I am using below code and getting following error.
ERROR:
-------
2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
raise NotImplementedError()
NotImplementedError
CODE:
------
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Dict, List, Optional, Union
class FixedEmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to. (templated)
:type job_flow_id: Optional[str]
:param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
job_flow_id. will search for id of JobFlow with matching name in one of the states in
param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
:type job_flow_name: Optional[str]
:param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
(templated)
:type cluster_states: list
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps or reference to a steps file (must be '.json') to
be added to the jobflow. (templated)
:type steps: list|str
:param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
:type do_xcom_push: bool
"""
template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
template_ext = ()
ui_color = '#f9c915'
def __init__(
self,
*,
job_flow_id: Optional[str] = None,
job_flow_name: Optional[str] = None,
cluster_states: Optional[List[str]] = None,
aws_conn_id: str = 'aws_default',
steps: Optional[Union[List[dict], str]] = None,
**kwargs,
):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
if not (job_flow_id is None) ^ (job_flow_name is None):
raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
super().__init__(**kwargs)
cluster_states = cluster_states or []
steps = steps or []
self.aws_conn_id = aws_conn_id
self.job_flow_id = job_flow_id
self.job_flow_name = job_flow_name
self.cluster_states = cluster_states
self.steps = steps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] rsibanez89 commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
rsibanez89 commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-1081354542
> Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
>
> **Source Code**:
>
> https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
>
> This was added by #8572
>
> You can fix these in 2 ways:
>
> 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
> 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
>
> ```python
> class FixedEmrAddStepsOperator(EmrAddStepsOperator):
> template_ext = ()
> ```
>
> and then you can use this Operator:
>
> ```python
> proc_step = FixedEmrAddStepsOperator(
> task_id='process_data',
> job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
> aws_conn_id='aws_default',
> steps=STEPS,
> )
> ```
@kaxil this solution works but seems like a hack to me. Is there any plan to solve this properly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678345450
Thanks for opening your first issue here! Be sure to follow the issue template!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870232004
The example has a typo, you should subclass `EmrAddStepsOperator` (or any other operator you want to “fix”), not `BaseOperator`. I’ve edited the message to fix this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870375072
This should do it @boyapcha
```python
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
class FixedEmrAddStepsOperator(EmrAddStepsOperator):
template_ext = ()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870232004
The example has a typo, you should subclass `EmrAddStepsOperator` (or any other operator you want to “fix”), not `BaseOperator`. I’ve edited the message to fix this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870664881
Thank you. It's working fine now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658
Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
**Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
This was added by https://github.com/apache/airflow/pull/8572
You can fix these in 2 ways:
1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
```python
class FixedEmrAddStepsOperator(EmrAddStepsOperator):
template_ext = ()
```
and then you can use this Operator:
```python
proc_step = FixedEmrAddStepsOperator(
task_id='process_data',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=STEPS,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870375072
This should do it @boyapcha
```python
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
class FixedEmrAddStepsOperator(EmrAddStepsOperator):
template_ext = ()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boyapcha edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boyapcha edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704
> Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
>
> **Source Code**:
>
> https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
>
> This was added by #8572
>
> You can fix these in 2 ways:
>
> 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
> 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
>
> ```python
> class FixedEmrAddStepsOperator(BaseOperator):
> template_ext = ()
> ```
>
> and then you can use this Operator:
>
> ```python
> proc_step = FixedEmrAddStepsOperator(
> task_id='process_data',
> job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
> aws_conn_id='aws_default',
> steps=STEPS,
> )
> ```
------
What should be the typical sub class code.
I am using below code and getting following error.
ERROR:
-------
```
2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
raise NotImplementedError()
NotImplementedError
```
CODE:
------
```
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Dict, List, Optional, Union
class FixedEmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to. (templated)
:type job_flow_id: Optional[str]
:param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
job_flow_id. will search for id of JobFlow with matching name in one of the states in
param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
:type job_flow_name: Optional[str]
:param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
(templated)
:type cluster_states: list
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps or reference to a steps file (must be '.json') to
be added to the jobflow. (templated)
:type steps: list|str
:param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
:type do_xcom_push: bool
"""
template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
template_ext = ()
ui_color = '#f9c915'
def __init__(
self,
*,
job_flow_id: Optional[str] = None,
job_flow_name: Optional[str] = None,
cluster_states: Optional[List[str]] = None,
aws_conn_id: str = 'aws_default',
steps: Optional[Union[List[dict], str]] = None,
**kwargs,
):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
if not (job_flow_id is None) ^ (job_flow_name is None):
raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
super().__init__(**kwargs)
cluster_states = cluster_states or []
steps = steps or []
self.aws_conn_id = aws_conn_id
self.job_flow_id = job_flow_id
self.job_flow_name = job_flow_name
self.cluster_states = cluster_states
self.steps = steps
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658
Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
**Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
This was added by https://github.com/apache/airflow/pull/8572
You can fix these in 2 ways:
1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
```python
class FixedEmrAddStepsOperator(BaseOperator):
template_ext = ()
```
and then you can use this Operator:
```python
proc_step = FixedEmrAddStepsOperator(
task_id='process_data',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=STEPS,
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-1081384642
The subclassing solution is not a hack 🙂 Rendering `.json` with Jinja2 is a feature by design on the oeprator, and if you don’t want that feature, you should make your own operator, i.e. by subclassing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658
Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
**Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
This was added by https://github.com/apache/airflow/pull/8572
You can fix these in 2 ways:
1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
```python
class FixedEmrAddStepsOperator(EmrAddStepsOperator):
template_ext = ()
```
and then you can use this Operator:
```python
proc_step = FixedEmrAddStepsOperator(
task_id='process_data',
job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=STEPS,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] smishra commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
smishra commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678385795
In case you want to answer it on StackOverflow (for others to find): https://stackoverflow.com/questions/63525443/airflow-fails-to-add-emr-step-using-emraddstep-when-hadoopjarstep-arg-has-an-arg
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boyapcha edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boyapcha edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704
> Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
>
> **Source Code**:
>
> https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
>
> This was added by #8572
>
> You can fix these in 2 ways:
>
> 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
> 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
>
> ```python
> class FixedEmrAddStepsOperator(BaseOperator):
> template_ext = ()
> ```
>
> and then you can use this Operator:
>
> ```python
> proc_step = FixedEmrAddStepsOperator(
> task_id='process_data',
> job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
> aws_conn_id='aws_default',
> steps=STEPS,
> )
> ```
------
What should be the typical sub class code.
I am using below code and getting following error.
ERROR:
-------
```
2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
raise NotImplementedError()
NotImplementedError
```
CODE:
------
```
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Dict, List, Optional, Union
class FixedEmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to. (templated)
:type job_flow_id: Optional[str]
:param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
job_flow_id. will search for id of JobFlow with matching name in one of the states in
param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
:type job_flow_name: Optional[str]
:param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
(templated)
:type cluster_states: list
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps or reference to a steps file (must be '.json') to
be added to the jobflow. (templated)
:type steps: list|str
:param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
:type do_xcom_push: bool
"""
template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
template_ext = ()
ui_color = '#f9c915'
def __init__(
self,
*,
job_flow_id: Optional[str] = None,
job_flow_name: Optional[str] = None,
cluster_states: Optional[List[str]] = None,
aws_conn_id: str = 'aws_default',
steps: Optional[Union[List[dict], str]] = None,
**kwargs,
):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
if not (job_flow_id is None) ^ (job_flow_name is None):
raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
super().__init__(**kwargs)
cluster_states = cluster_states or []
steps = steps or []
self.aws_conn_id = aws_conn_id
self.job_flow_id = job_flow_id
self.job_flow_name = job_flow_name
self.cluster_states = cluster_states
self.steps = steps
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704
> Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
>
> **Source Code**:
>
> https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
>
> This was added by #8572
>
> You can fix these in 2 ways:
>
> 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
> 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
>
> ```python
> class FixedEmrAddStepsOperator(BaseOperator):
> template_ext = ()
> ```
>
> and then you can use this Operator:
>
> ```python
> proc_step = FixedEmrAddStepsOperator(
> task_id='process_data',
> job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
> aws_conn_id='aws_default',
> steps=STEPS,
> )
> ```
------
What should be the typical sub class code.
I am using below code and getting following error.
ERROR:
-------
2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
raise NotImplementedError()
NotImplementedError
CODE:
------
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from typing import Any, Dict, List, Optional, Union
class FixedEmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to. (templated)
:type job_flow_id: Optional[str]
:param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
job_flow_id. will search for id of JobFlow with matching name in one of the states in
param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
:type job_flow_name: Optional[str]
:param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
(templated)
:type cluster_states: list
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps or reference to a steps file (must be '.json') to
be added to the jobflow. (templated)
:type steps: list|str
:param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
:type do_xcom_push: bool
"""
template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
template_ext = ()
ui_color = '#f9c915'
def __init__(
self,
*,
job_flow_id: Optional[str] = None,
job_flow_name: Optional[str] = None,
cluster_states: Optional[List[str]] = None,
aws_conn_id: str = 'aws_default',
steps: Optional[Union[List[dict], str]] = None,
**kwargs,
):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
if not (job_flow_id is None) ^ (job_flow_name is None):
raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
super().__init__(**kwargs)
cluster_states = cluster_states or []
steps = steps or []
self.aws_conn_id = aws_conn_id
self.job_flow_id = job_flow_id
self.job_flow_name = job_flow_name
self.cluster_states = cluster_states
self.steps = steps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal closed issue #10451: Airflow tries running Jinja on any property name ending with .json
Posted by GitBox <gi...@apache.org>.
eladkal closed issue #10451:
URL: https://github.com/apache/airflow/issues/10451
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org