You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/29 08:05:28 UTC

[GitHub] [airflow] boyapcha edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json

boyapcha edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(BaseOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   ------
   What should be the typical sub class code.
   I am using below code and getting following error.
   
   ERROR:
   -------
   ```
   2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
       raise NotImplementedError()
   NotImplementedError
   ```
   
   CODE:
   ------
   ```
   from airflow.models import BaseOperator
   from airflow.exceptions import AirflowException
   from typing import Any, Dict, List, Optional, Union
   
   class FixedEmrAddStepsOperator(BaseOperator):
       """
       An operator that adds steps to an existing EMR job_flow.
       :param job_flow_id: id of the JobFlow to add steps to. (templated)
       :type job_flow_id: Optional[str]
       :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
           job_flow_id. will search for id of JobFlow with matching name in one of the states in
           param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
       :type job_flow_name: Optional[str]
       :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
           (templated)
       :type cluster_states: list
       :param aws_conn_id: aws connection to uses
       :type aws_conn_id: str
       :param steps: boto3 style steps or reference to a steps file (must be '.json') to
           be added to the jobflow. (templated)
       :type steps: list|str
       :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
       :type do_xcom_push: bool
       """
   
       template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
       template_ext = ()
       ui_color = '#f9c915'
   
       def __init__(
           self,
           *,
           job_flow_id: Optional[str] = None,
           job_flow_name: Optional[str] = None,
           cluster_states: Optional[List[str]] = None,
           aws_conn_id: str = 'aws_default',
           steps: Optional[Union[List[dict], str]] = None,
           **kwargs,
       ):
           if kwargs.get('xcom_push') is not None:
               raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
           if not (job_flow_id is None) ^ (job_flow_name is None):
               raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
           super().__init__(**kwargs)
           cluster_states = cluster_states or []
           steps = steps or []
           self.aws_conn_id = aws_conn_id
           self.job_flow_id = job_flow_id
           self.job_flow_name = job_flow_name
           self.cluster_states = cluster_states
           self.steps = steps
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org