You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kyle-winkelman (via GitHub)" <gi...@apache.org> on 2023/02/27 20:28:28 UTC

[GitHub] [airflow] kyle-winkelman opened a new pull request, #29790: Provider Databricks add jobs create operator.

kyle-winkelman opened a new pull request, #29790:
URL: https://github.com/apache/airflow/pull/29790

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #29733
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1128575214


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   Do you think this new operator should wait for the new Python API that you expect to be available soon?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #29790: Provider Databricks add jobs create operator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #29790: Provider Databricks add jobs create operator.
URL: https://github.com/apache/airflow/pull/29790


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1125673044


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   For new operators I would really like to use real data structures (for example, data classes), not the simple objects as it doesn't provide users with auto-completion, etc. - it's easy to make mistake in the untyped JSON definition 



##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,
+        databricks_conn_id: str = "databricks_default",
+        polling_period_seconds: int = 30,
+        databricks_retry_limit: int = 3,
+        databricks_retry_delay: int = 1,
+        databricks_retry_args: dict[Any, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        """Creates a new ``DatabricksJobsCreateOperator``."""
+        super().__init__(**kwargs)
+        self.json = json or {}
+        self.databricks_conn_id = databricks_conn_id
+        self.polling_period_seconds = polling_period_seconds
+        self.databricks_retry_limit = databricks_retry_limit
+        self.databricks_retry_delay = databricks_retry_delay
+        self.databricks_retry_args = databricks_retry_args
+        if name is not None:
+            self.json["name"] = name
+        if tags is not None:
+            self.json["tags"] = tags
+        if tasks is not None:
+            self.json["tasks"] = tasks
+        if job_clusters is not None:
+            self.json["job_clusters"] = job_clusters
+        if email_notifications is not None:
+            self.json["email_notifications"] = email_notifications
+        if webhook_notifications is not None:
+            self.json["webhook_notifications"] = webhook_notifications
+        if timeout_seconds is not None:
+            self.json["timeout_seconds"] = timeout_seconds
+        if schedule is not None:
+            self.json["schedule"] = schedule
+        if max_concurrent_runs is not None:
+            self.json["max_concurrent_runs"] = max_concurrent_runs
+        if git_source is not None:
+            self.json["git_source"] = git_source
+        if access_control_list is not None:
+            self.json["access_control_list"] = access_control_list
+
+        self.json = normalise_json_content(self.json)
+
+    @cached_property
+    def _hook(self):
+        return self._get_hook(caller="DatabricksJobsCreateOperator")
+
+    def _get_hook(self, caller: str) -> DatabricksHook:
+        return DatabricksHook(
+            self.databricks_conn_id,
+            retry_limit=self.databricks_retry_limit,
+            retry_delay=self.databricks_retry_delay,
+            retry_args=self.databricks_retry_args,
+            caller=caller,
+        )
+
+    def execute(self, context: Context) -> int:
+        self.job_id = self.xcom_pull(
+            context,
+            task_ids=self.task_id,
+            include_prior_dates=True,
+        )
+        if self.job_id:
+            self._hook.reset(self.job_id, self.json)

Review Comment:
   Instead of I would recommend to get the current job definition and compare with the new definition, and reset only if this definition changes (that should happen relatively rare). 
   
   Also, you need to handle following edge cases:
   
   * Job is deleted via UI - the reset will fail because job ID doesn't exist
   * We can lose XCom information, so we'll create a duplicate for the job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1519047185

   @potiuk Unfortunately no news. The decision takes longer than planned. If we can deprecate it in the later versions, let merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1447037997

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1128577520


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   I’m waiting for answer from product management - maybe we won’t need this operator…



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1454823483

   cc: @alexott ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1156865784


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   Have you heard anything from the Databricks product management @alexott?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] stikkireddy commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "stikkireddy (via GitHub)" <gi...@apache.org>.
stikkireddy commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1579269914

   hey @kyle-winkelman do you have bandwidth to move forward with this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1519031642

   > @alexott convinced me that it needs changes.
   
   @alexott - are there any news from Databricks on that one? We are considering merging it regardless, but it seems you wanted to have some input from the team first?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] stikkireddy commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "stikkireddy (via GitHub)" <gi...@apache.org>.
stikkireddy commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1604803451

   @potiuk is it okay if someone else forks Kyle's repo and fix the static check to not lose @kyle-winkelman's credit/commits and also address the previous comments made by @alexott? I am also okay to wait a bit for Kyle to respond.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1156868220


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   no decision yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1169249943


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   @alexott lets please get to a decision in the upcoming days (before next wave of providers)... if not i will accept the PR as is. 
   Should in the future Databricks decide against these operators and present arguments why we shouldn't have them we can always remove with a major release.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1519060322

   > @potiuk Unfortunately no news. The decision takes longer than planned. If we can deprecate it in the later versions, let merge it.
   
   OK. Cool. I am a big fan of having tactical solutions that solve part of the probem or a problem for smaller group of people  and releasing them early, as long as they are not preventing strategic long term solutions that needs a longer debate and far more work to be implemented later. This one looks like one of those.
   
   I re-reviewed it and it looks cool. @kyle-winkelman -> can you please rebase and fix the static check problem, then I am happy  to merge it (@eladkal  - WDYT?).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1454871094

   My specific use case is that I want to run multiple tasks in the same job and on a single `job_clusters`. This is not supported by the DatabricksSubmitRunOperator because it doesn't support the parameter `job_clusters`, so the only way to support it is to create a new_cluster per task I want to run.
   
   The other approach is to use the DatabricksRunNowOperator which has the limitation that you have to define your Databricks Job somewhere else and in some different manner (i.e. manually in Databricks UI, custom CI/CD pipeline, etc.). My team doesn't like having the definition of the job be separated from the use of it from Airflow. In my opinion the DatabricksRunNowOperator with just a single `job_id` lacks information about what is actually happening.
   
   So to sum up, it is useful to be paired with the DatabricksRunNowOperator to define a job and run it in the same DAG.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1454858582

   @kyle-winkelman what are the use cases for that operator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1126908172


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   I don’t have existing example, it was just thoughts for the new operators. New Python API will be available soon, that will provide access to the latest APIs. I need to ask dev team when it’s coming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1156867834


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,
+        databricks_conn_id: str = "databricks_default",
+        polling_period_seconds: int = 30,
+        databricks_retry_limit: int = 3,
+        databricks_retry_delay: int = 1,
+        databricks_retry_args: dict[Any, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        """Creates a new ``DatabricksJobsCreateOperator``."""
+        super().__init__(**kwargs)
+        self.json = json or {}
+        self.databricks_conn_id = databricks_conn_id
+        self.polling_period_seconds = polling_period_seconds
+        self.databricks_retry_limit = databricks_retry_limit
+        self.databricks_retry_delay = databricks_retry_delay
+        self.databricks_retry_args = databricks_retry_args
+        if name is not None:
+            self.json["name"] = name
+        if tags is not None:
+            self.json["tags"] = tags
+        if tasks is not None:
+            self.json["tasks"] = tasks
+        if job_clusters is not None:
+            self.json["job_clusters"] = job_clusters
+        if email_notifications is not None:
+            self.json["email_notifications"] = email_notifications
+        if webhook_notifications is not None:
+            self.json["webhook_notifications"] = webhook_notifications
+        if timeout_seconds is not None:
+            self.json["timeout_seconds"] = timeout_seconds
+        if schedule is not None:
+            self.json["schedule"] = schedule
+        if max_concurrent_runs is not None:
+            self.json["max_concurrent_runs"] = max_concurrent_runs
+        if git_source is not None:
+            self.json["git_source"] = git_source
+        if access_control_list is not None:
+            self.json["access_control_list"] = access_control_list
+
+        self.json = normalise_json_content(self.json)
+
+    @cached_property
+    def _hook(self):
+        return self._get_hook(caller="DatabricksJobsCreateOperator")
+
+    def _get_hook(self, caller: str) -> DatabricksHook:
+        return DatabricksHook(
+            self.databricks_conn_id,
+            retry_limit=self.databricks_retry_limit,
+            retry_delay=self.databricks_retry_delay,
+            retry_args=self.databricks_retry_args,
+            caller=caller,
+        )
+
+    def execute(self, context: Context) -> int:
+        self.job_id = self.xcom_pull(
+            context,
+            task_ids=self.task_id,
+            include_prior_dates=True,
+        )
+        if self.job_id:
+            self._hook.reset(self.job_id, self.json)

Review Comment:
   not yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1604888950

   > @potiuk is it okay if someone else forks Kyle's repo and fix the static check to not lose @kyle-winkelman's credit/commits and also address the previous comments made by @alexott? I am also okay to wait a bit for Kyle to respond.
   
   Fine for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kyle-winkelman commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "kyle-winkelman (via GitHub)" <gi...@apache.org>.
kyle-winkelman commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1126905101


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   Do you have an example of a different operator that follows this pattern? It would be helpful to see an example.
   
   Any thoughts on a tool to generate such data structures from the [Databricks OpenAPI Spec](https://docs.databricks.com/_extras/api-refs/jobs-2.1-aws.yaml)? There are a lot of data structures that would need to be created and I don't want to do so manually.
   
   My initial thought was to rely on [Databricks Python API](https://docs.databricks.com/dev-tools/python-api.html), but it doesn't have these data structures or validations either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] alexott commented on a diff in pull request #29790: Provider Databricks add jobs create operator.

Posted by "alexott (via GitHub)" <gi...@apache.org>.
alexott commented on code in PR #29790:
URL: https://github.com/apache/airflow/pull/29790#discussion_r1126908172


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -151,6 +151,145 @@ def get_link(
         return XCom.get_value(key=XCOM_RUN_PAGE_URL_KEY, ti_key=ti_key)
 
 
+class DatabricksJobsCreateOperator(BaseOperator):
+    """
+    Creates (or resets) a Databricks job using the
+    `api/2.1/jobs/create
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate>`_
+    (or `api/2.1/jobs/reset
+    <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsReset>`_)
+    API endpoint.
+
+    .. seealso::
+        https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+
+    :param json: A JSON object containing API parameters which will be passed
+        directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
+        (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
+        be merged with this json dictionary if they are provided.
+        If there are conflicts during the merge, the named parameters will
+        take precedence and override the top level json keys. (templated)
+
+        .. seealso::
+            For more information about templating see :ref:`concepts:jinja-templating`.
+    :param name: An optional name for the job.
+    :param tags: A map of tags associated with the job.
+    :param tasks: A list of task specifications to be executed by this job.
+        Array of objects (JobTaskSettings).
+    :param job_clusters: A list of job cluster specifications that can be shared and reused by
+        tasks of this job. Array of objects (JobCluster).
+    :param email_notifications: Object (JobEmailNotifications).
+    :param webhook_notifications: Object (WebhookNotifications).
+    :param timeout_seconds: An optional timeout applied to each run of this job.
+    :param schedule: Object (CronSchedule).
+    :param max_concurrent_runs: An optional maximum allowed number of concurrent runs of the job.
+    :param git_source: An optional specification for a remote repository containing the notebooks
+        used by this job's notebook tasks. Object (GitSource).
+    :param access_control_list: List of permissions to set on the job. Array of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider using the Databricks
+            UI.
+    :param databricks_conn_id: Reference to the
+        :ref:`Databricks connection <howto/connection:databricks>`. (templated)
+    :param polling_period_seconds: Controls the rate which we poll for the result of
+        this run. By default the operator will poll every 30 seconds.
+    :param databricks_retry_limit: Amount of times retry if the Databricks backend is
+        unreachable. Its value must be greater than or equal to 1.
+    :param databricks_retry_delay: Number of seconds to wait between retries (it
+            might be a floating point number).
+    :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class.
+    """
+
+    # Used in airflow.models.BaseOperator
+    template_fields: Sequence[str] = ("json", "databricks_conn_id")
+    # Databricks brand color (blue) under white text
+    ui_color = "#1CB1C2"
+    ui_fgcolor = "#fff"
+
+    def __init__(
+        self,
+        *,
+        json: Any | None = None,
+        name: str | None = None,
+        tags: dict[str, str] | None = None,
+        tasks: list[object] | None = None,
+        job_clusters: list[object] | None = None,
+        email_notifications: object | None = None,
+        webhook_notifications: object | None = None,
+        timeout_seconds: int | None = None,
+        schedule: dict[str, str] | None = None,
+        max_concurrent_runs: int | None = None,
+        git_source: dict[str, str] | None = None,
+        access_control_list: list[dict[str, str]] | None = None,

Review Comment:
   I don’t have existing example, it was just thoughts for the new operators. New Python API will be available soon, that will provide access to the latest APIs. I need to ask dev team when it’s foming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] lennartkats-db commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "lennartkats-db (via GitHub)" <gi...@apache.org>.
lennartkats-db commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1568527563

   @potiuk @kyle-winkelman This seems like a useful addition from the Databricks perspective. Thanks for contributing, Kyle! 🙏
   
   I do think it would be good to look at the XCOM issue that Alex called out. Wouldn't it be better to just use the API to find an existing job for the cases that Alex called out where the current approach doesn't work? To avoid creating duplicate jobs or having the operator fail? The `DatabricksRunNowOperator` already applies [logic like this](https://github.com/apache/airflow/blob/91568b26a448bf15c91dcb4497690d54842678fc/airflow/providers/databricks/hooks/databricks.py#L211), as it allows users to run a given job based on the name of that job (rather than the job id).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #29790: Provider Databricks add jobs create operator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29790:
URL: https://github.com/apache/airflow/pull/29790#issuecomment-1668730096

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org