You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/21 15:16:28 UTC

[GitHub] [airflow] smishra opened a new issue #10451: Airflow tries running Jinja on any property name ending with .json

smishra opened a new issue #10451:
URL: https://github.com/apache/airflow/issues/10451


   Looks like there is a bug in Airflow templated operator argument when it has any string that ends with _.json._ Following is my DAG - please notice **"--files", "s3://dummy/spark/application.json"** in the STEPS variable.
   
   `from datetime import timedelta
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
   from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
   from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
   from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
   from airflow.utils.dates import days_ago
   
   DEFAULT_ARGS = {
       'owner': 'Commscope',
       'depends_on_past': False,
       'email': ['smishra@commscope.com'],
       'email_on_failure': False,
       'email_on_retry': False
   }
   
   
   JOB_FLOW_OVERRIDES = {
       'Name': 'PiCalc',
       'ReleaseLabel': 'emr-5.29.0',
       'Instances': {
           'InstanceGroups': [
               {
                   'Name': 'Master node',
                   'Market': 'SPOT',
                   'InstanceRole': 'MASTER',
                   'InstanceType': 'm1.medium',
                   'InstanceCount': 1,
               }
           ],
           'KeepJobFlowAliveWhenNoSteps': True,
           'TerminationProtected': False,
       },
       'JobFlowRole': 'EMR_EC2_DefaultRole',
       'ServiceRole': 'EMR_DefaultRole',
   }
   
   STEPS = [{
       "Name": "Process data",
       "ActionOnFailure": "CONTINUE",
       "HadoopJarStep": {
           "Jar": "command-runner.jar",
           "Args": [
               "--class", "com.dummy.Application",
               "--files", "s3://dummy/spark/application.json",
               "--driver-java-options",
               "-Dlog4j.configuration=log4j.properties",
               "--driver-java-options",
               "-Dconfig.resource=application.json",
               "--driver-java-options"
               "s3://dummy/spark/app-jar-with-dependencies.jar",
               "application.json"
           ]
       }
   }]
   
   with DAG(
           dag_id='data_processing',
           default_args=DEFAULT_ARGS,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(2),
           schedule_interval='0 3 * * *',
           tags=['inquire', 'bronze'],
   ) as dag:
       job_flow_creator = EmrCreateJobFlowOperator(
           task_id='launch_emr_cluster',
           job_flow_overrides=JOB_FLOW_OVERRIDES,
           aws_conn_id='aws_default',
           emr_conn_id='emr_default'
       )
   
       job_flow_sensor = EmrJobFlowSensor(
           task_id='check_cluster',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           target_states=['RUNNING', 'WAITING'],
           aws_conn_id='aws_default'
       )
   
       proc_step = EmrAddStepsOperator(
           task_id='process_data',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           aws_conn_id='aws_default',
           steps=STEPS,
       )
   
       job_flow_terminator = EmrTerminateJobFlowOperator(
           task_id='terminate_emr_cluster',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           aws_conn_id='aws_default',
           trigger_rule="all_done"
       )
   
       job_flow_creator >> job_flow_sensor >> proc_step >> job_flow_terminator
   `
   
   The cluster launches successfully but the **Airflow fails with following error**
   
   `[2020-08-21 15:06:42,307] {taskinstance.py:1145} ERROR - s3://dummy/spark/application.json
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 964, in _run_raw_task
       self.render_templates(context=context)
   ...
   ...
     File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 187, in get_source
       raise TemplateNotFound(template)
   jinja2.exceptions.TemplateNotFound: s3://dummy/spark/application.json`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(BaseOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   ------
   What should be the typical sub class code.
   I am using below code and getting following error.
   
   ERROR:
   -------
   2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
       raise NotImplementedError()
   NotImplementedError
   
   CODE:
   ------
   from airflow.models import BaseOperator
   from airflow.exceptions import AirflowException
   from typing import Any, Dict, List, Optional, Union
   
   class FixedEmrAddStepsOperator(BaseOperator):
       """
       An operator that adds steps to an existing EMR job_flow.
       :param job_flow_id: id of the JobFlow to add steps to. (templated)
       :type job_flow_id: Optional[str]
       :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
           job_flow_id. will search for id of JobFlow with matching name in one of the states in
           param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
       :type job_flow_name: Optional[str]
       :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
           (templated)
       :type cluster_states: list
       :param aws_conn_id: aws connection to uses
       :type aws_conn_id: str
       :param steps: boto3 style steps or reference to a steps file (must be '.json') to
           be added to the jobflow. (templated)
       :type steps: list|str
       :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
       :type do_xcom_push: bool
       """
   
       template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
       template_ext = ()
       ui_color = '#f9c915'
   
       def __init__(
           self,
           *,
           job_flow_id: Optional[str] = None,
           job_flow_name: Optional[str] = None,
           cluster_states: Optional[List[str]] = None,
           aws_conn_id: str = 'aws_default',
           steps: Optional[Union[List[dict], str]] = None,
           **kwargs,
       ):
           if kwargs.get('xcom_push') is not None:
               raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
           if not (job_flow_id is None) ^ (job_flow_name is None):
               raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
           super().__init__(**kwargs)
           cluster_states = cluster_states or []
           steps = steps or []
           self.aws_conn_id = aws_conn_id
           self.job_flow_id = job_flow_id
           self.job_flow_name = job_flow_name
           self.cluster_states = cluster_states
           self.steps = steps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rsibanez89 commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
rsibanez89 commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-1081354542


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(EmrAddStepsOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   @kaxil this solution works but seems like a hack to me. Is there any plan to solve this properly?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678345450


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870232004


   The example has a typo, you should subclass `EmrAddStepsOperator` (or any other operator you want to “fix”), not `BaseOperator`. I’ve edited the message to fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870375072


   This should do it @boyapcha 
   
   ```python
   from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
   
   
   class FixedEmrAddStepsOperator(EmrAddStepsOperator):
       template_ext = ()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870232004


   The example has a typo, you should subclass `EmrAddStepsOperator` (or any other operator you want to “fix”), not `BaseOperator`. I’ve edited the message to fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870664881


   Thank you. It's working fine now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658


   Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']` 
   
   
   
   **Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   
   This was added by https://github.com/apache/airflow/pull/8572 
   
   You can fix these in 2 ways:
   
   1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   
   2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   
   ```python
   class FixedEmrAddStepsOperator(EmrAddStepsOperator):
       template_ext = ()
   ```
   
   and then you can use this Operator:
   
   ```python
       proc_step = FixedEmrAddStepsOperator(
           task_id='process_data',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           aws_conn_id='aws_default',
           steps=STEPS,
       )
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870375072


   This should do it @boyapcha 
   
   ```python
   from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
   
   
   class FixedEmrAddStepsOperator(EmrAddStepsOperator):
       template_ext = ()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boyapcha edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boyapcha edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(BaseOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   ------
   What should be the typical sub class code.
   I am using below code and getting following error.
   
   ERROR:
   -------
   ```
   2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
       raise NotImplementedError()
   NotImplementedError
   ```
   
   CODE:
   ------
   ```
   from airflow.models import BaseOperator
   from airflow.exceptions import AirflowException
   from typing import Any, Dict, List, Optional, Union
   
   class FixedEmrAddStepsOperator(BaseOperator):
       """
       An operator that adds steps to an existing EMR job_flow.
       :param job_flow_id: id of the JobFlow to add steps to. (templated)
       :type job_flow_id: Optional[str]
       :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
           job_flow_id. will search for id of JobFlow with matching name in one of the states in
           param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
       :type job_flow_name: Optional[str]
       :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
           (templated)
       :type cluster_states: list
       :param aws_conn_id: aws connection to uses
       :type aws_conn_id: str
       :param steps: boto3 style steps or reference to a steps file (must be '.json') to
           be added to the jobflow. (templated)
       :type steps: list|str
       :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
       :type do_xcom_push: bool
       """
   
       template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
       template_ext = ()
       ui_color = '#f9c915'
   
       def __init__(
           self,
           *,
           job_flow_id: Optional[str] = None,
           job_flow_name: Optional[str] = None,
           cluster_states: Optional[List[str]] = None,
           aws_conn_id: str = 'aws_default',
           steps: Optional[Union[List[dict], str]] = None,
           **kwargs,
       ):
           if kwargs.get('xcom_push') is not None:
               raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
           if not (job_flow_id is None) ^ (job_flow_name is None):
               raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
           super().__init__(**kwargs)
           cluster_states = cluster_states or []
           steps = steps or []
           self.aws_conn_id = aws_conn_id
           self.job_flow_id = job_flow_id
           self.job_flow_name = job_flow_name
           self.cluster_states = cluster_states
           self.steps = steps
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658


   Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']` 
   
   
   
   **Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   
   This was added by https://github.com/apache/airflow/pull/8572 
   
   You can fix these in 2 ways:
   
   1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   
   2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   
   ```python
   class FixedEmrAddStepsOperator(BaseOperator):
       template_ext = ()
   ```
   
   and then you can use this Operator:
   
   ```python
       proc_step = FixedEmrAddStepsOperator(
           task_id='process_data',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           aws_conn_id='aws_default',
           steps=STEPS,
       )
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-1081384642


   The subclassing solution is not a hack 🙂 Rendering `.json` with Jinja2 is a feature by design on the oeprator, and if you don’t want that feature, you should make your own operator, i.e. by subclassing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678671658


   Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']` 
   
   
   
   **Source Code**: https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   
   This was added by https://github.com/apache/airflow/pull/8572 
   
   You can fix these in 2 ways:
   
   1) Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   
   2) Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   
   ```python
   class FixedEmrAddStepsOperator(EmrAddStepsOperator):
       template_ext = ()
   ```
   
   and then you can use this Operator:
   
   ```python
       proc_step = FixedEmrAddStepsOperator(
           task_id='process_data',
           job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
           aws_conn_id='aws_default',
           steps=STEPS,
       )
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smishra commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
smishra commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-678385795


   In case you want to answer it on StackOverflow (for others to find): https://stackoverflow.com/questions/63525443/airflow-fails-to-add-emr-step-using-emraddstep-when-hadoopjarstep-arg-has-an-arg


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boyapcha edited a comment on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boyapcha edited a comment on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(BaseOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   ------
   What should be the typical sub class code.
   I am using below code and getting following error.
   
   ERROR:
   -------
   ```
   2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
       raise NotImplementedError()
   NotImplementedError
   ```
   
   CODE:
   ------
   ```
   from airflow.models import BaseOperator
   from airflow.exceptions import AirflowException
   from typing import Any, Dict, List, Optional, Union
   
   class FixedEmrAddStepsOperator(BaseOperator):
       """
       An operator that adds steps to an existing EMR job_flow.
       :param job_flow_id: id of the JobFlow to add steps to. (templated)
       :type job_flow_id: Optional[str]
       :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
           job_flow_id. will search for id of JobFlow with matching name in one of the states in
           param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
       :type job_flow_name: Optional[str]
       :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
           (templated)
       :type cluster_states: list
       :param aws_conn_id: aws connection to uses
       :type aws_conn_id: str
       :param steps: boto3 style steps or reference to a steps file (must be '.json') to
           be added to the jobflow. (templated)
       :type steps: list|str
       :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
       :type do_xcom_push: bool
       """
   
       template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
       template_ext = ()
       ui_color = '#f9c915'
   
       def __init__(
           self,
           *,
           job_flow_id: Optional[str] = None,
           job_flow_name: Optional[str] = None,
           cluster_states: Optional[List[str]] = None,
           aws_conn_id: str = 'aws_default',
           steps: Optional[Union[List[dict], str]] = None,
           **kwargs,
       ):
           if kwargs.get('xcom_push') is not None:
               raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
           if not (job_flow_id is None) ^ (job_flow_name is None):
               raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
           super().__init__(**kwargs)
           cluster_states = cluster_states or []
           steps = steps or []
           self.aws_conn_id = aws_conn_id
           self.job_flow_id = job_flow_id
           self.job_flow_name = job_flow_name
           self.cluster_states = cluster_states
           self.steps = steps
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boyapcha commented on issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
boyapcha commented on issue #10451:
URL: https://github.com/apache/airflow/issues/10451#issuecomment-870203704


   > Airflow tried to renders all the values passed to template_fields. In your case as your are using `EmrAddStepsOperator`, it's template_fields are `['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']`
   > 
   > **Source Code**:
   > 
   > https://github.com/apache/airflow/blob/47c6657ce012f6db147fdcce3ca5e77f46a9e491/airflow/providers/amazon/aws/operators/emr_add_steps.py#L48
   > 
   > This was added by #8572
   > 
   > You can fix these in 2 ways:
   > 
   > 1. Bypass this by adding an extra space after `.json` example `"s3://dummy/spark/application.json "`. This works because Airflow looks for each element in the Iterable to find if the string ends in `.json`
   > 2. Subclass `EmrAddStepsOperator` and override the `template_ext` field. Example:
   > 
   > ```python
   > class FixedEmrAddStepsOperator(BaseOperator):
   >     template_ext = ()
   > ```
   > 
   > and then you can use this Operator:
   > 
   > ```python
   >     proc_step = FixedEmrAddStepsOperator(
   >         task_id='process_data',
   >         job_flow_id="{{ task_instance.xcom_pull(task_ids='launch_emr_cluster', key='return_value') }}",
   >         aws_conn_id='aws_default',
   >         steps=STEPS,
   >     )
   > ```
   
   ------
   What should be the typical sub class code.
   I am using below code and getting following error.
   
   ERROR:
   -------
   2021-06-29 03:20:04,851] {{taskinstance.py:1482}} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 831, in execute
       raise NotImplementedError()
   NotImplementedError
   
   CODE:
   ------
   from airflow.models import BaseOperator
   from airflow.exceptions import AirflowException
   from typing import Any, Dict, List, Optional, Union
   
   class FixedEmrAddStepsOperator(BaseOperator):
       """
       An operator that adds steps to an existing EMR job_flow.
       :param job_flow_id: id of the JobFlow to add steps to. (templated)
       :type job_flow_id: Optional[str]
       :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing
           job_flow_id. will search for id of JobFlow with matching name in one of the states in
           param cluster_states. Exactly one cluster like this should exist or will fail. (templated)
       :type job_flow_name: Optional[str]
       :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name.
           (templated)
       :type cluster_states: list
       :param aws_conn_id: aws connection to uses
       :type aws_conn_id: str
       :param steps: boto3 style steps or reference to a steps file (must be '.json') to
           be added to the jobflow. (templated)
       :type steps: list|str
       :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id.
       :type do_xcom_push: bool
       """
   
       template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps']
       template_ext = ()
       ui_color = '#f9c915'
   
       def __init__(
           self,
           *,
           job_flow_id: Optional[str] = None,
           job_flow_name: Optional[str] = None,
           cluster_states: Optional[List[str]] = None,
           aws_conn_id: str = 'aws_default',
           steps: Optional[Union[List[dict], str]] = None,
           **kwargs,
       ):
           if kwargs.get('xcom_push') is not None:
               raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
           if not (job_flow_id is None) ^ (job_flow_name is None):
               raise AirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')
           super().__init__(**kwargs)
           cluster_states = cluster_states or []
           steps = steps or []
           self.aws_conn_id = aws_conn_id
           self.job_flow_id = job_flow_id
           self.job_flow_name = job_flow_name
           self.cluster_states = cluster_states
           self.steps = steps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal closed issue #10451: Airflow tries running Jinja on any property name ending with .json

Posted by GitBox <gi...@apache.org>.
eladkal closed issue #10451:
URL: https://github.com/apache/airflow/issues/10451


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org