You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "nivangio (via GitHub)" <gi...@apache.org> on 2023/02/24 15:50:16 UTC

[GitHub] [airflow] nivangio opened a new issue, #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

nivangio opened a new issue, #29746:
URL: https://github.com/apache/airflow/issues/29746

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-databricks==4.0.0
   
   ### Apache Airflow version
   
   2.4.3
   
   ### Operating System
   
   MAC OS
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   The issue is consistent across multiple Airflow deployments (locally on Docker Compose, remotely on MWAA in AWS, locally using virualenv)
   
   ### What happened
   
   Passing `base_parameters` key into `notebook_task` parameter for `DatabricksSubmitRunOperator` as output of a previous task (TaskFlow paradigm) does not work.
   
   After inspection of `DatabricksSubmitRunOperator.init` it seems that the problem relies on the fact that it uses `utils.databricks.normalise_json_content` to validate input parameters and, given that the input parameter is of type `PlainXComArg`, it fails to parse.
   
   The workaround I found is to call it using `partial` and `expand`, which is a bit hacky and much less legible
   
   ### What you think should happen instead
   
   `DatabricksSubmitRunOperator` should accept `PlainXComArg` arguments on init and eventually validate on `execute`, prior to submitting job run.
   
   ### How to reproduce
   
   This DAG fails to parse:
   
   ```python3
   with DAG(
       "dag_erroring",
       start_date=days_ago(1),
       params={"param_1": "", "param_2": ""},
   ) as dag:
   
       @task
       def from_dag_params_to_notebook_params(**context):
   
           # Transform/Validate DAG input parameters to sth expected by Notebook
           notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
           notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
   
           return {"some_param": notebook_param_1, "some_other_param": notebook_param_2}
   
       DatabricksSubmitRunOperator(
           task_id="my_notebook_task",
           new_cluster={
               "cluster_name": "single-node-cluster",
               "spark_version": "7.6.x-scala2.12",
               "node_type_id": "i3.xlarge",
               "num_workers": 0,
               "spark_conf": {
                   "spark.databricks.cluster.profile": "singleNode",
                   "spark.master": "[*, 4]",
               },
               "custom_tags": {"ResourceClass": "SingleNode"},
           },
           notebook_task={
               "notebook_path": "some/path/to/a/notebook",
               "base_parameters": from_dag_params_to_notebook_params(),
           },
           libraries=[],
           databricks_retry_limit=3,
           timeout_seconds=86400,
           polling_period_seconds=20,
       )
   ```
   
   This one does not:
   
   ```python3
   with DAG(
       "dag_parsing_fine",
       start_date=days_ago(1),
       params={"param_1": "", "param_2": ""},
   ) as dag:
   
       @task
       def from_dag_params_to_notebook_params(**context):
   
           # Transform/Validate DAG input parameters to sth expected by Notebook
           notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
           notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
   
           return [{"notebook_path": "some/path/to/a/notebook", "base_parameters":{"some_param": notebook_param_1, "some_other_param": notebook_param_2}}]
   
       DatabricksSubmitRunOperator.partial(
           task_id="my_notebook_task",
           new_cluster={
               "cluster_name": "single-node-cluster",
               "spark_version": "7.6.x-scala2.12",
               "node_type_id": "i3.xlarge",
               "num_workers": 0,
               "spark_conf": {
                   "spark.databricks.cluster.profile": "singleNode",
                   "spark.master": "[*, 4]",
               },
               "custom_tags": {"ResourceClass": "SingleNode"},
           },
           libraries=[],
           databricks_retry_limit=3,
           timeout_seconds=86400,
           polling_period_seconds=20,
       ).expand(notebook_task=from_dag_params_to_notebook_params())
   
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell closed issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell closed issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to  `base_parameters`
URL: https://github.com/apache/airflow/issues/29746


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nivangio commented on issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

Posted by "nivangio (via GitHub)" <gi...@apache.org>.
nivangio commented on issue #29746:
URL: https://github.com/apache/airflow/issues/29746#issuecomment-1446692807

   @josh-fell great! I will probably submit sth in the direction of what was mentioned above. If there is any objection against the approach (i.e., moving the validation from `init` to `execute`) please LMK!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #29746:
URL: https://github.com/apache/airflow/issues/29746#issuecomment-1446699349

   Not that I can think of off the top of my head. Generally it's just best to leave the constructor as simple as possible and leave all validations to `execute()` so I think the approach would be a good one!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nivangio commented on issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

Posted by "nivangio (via GitHub)" <gi...@apache.org>.
nivangio commented on issue #29746:
URL: https://github.com/apache/airflow/issues/29746#issuecomment-1446702905

   Awesome! Thx a lot for your prompt response :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #29746: DatabricksSubmitRunOperator does not support passing output of another task to `base_parameters`

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #29746:
URL: https://github.com/apache/airflow/issues/29746#issuecomment-1444978741

   @nivangio All yours, feel free to submit a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org