You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/07 12:27:08 UTC

[GitHub] [airflow] norm opened a new pull request, #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

norm opened a new pull request, #26208:
URL: https://github.com/apache/airflow/pull/26208

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964876545


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -131,13 +131,61 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
         ...
 
 
-If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration::
+If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
+
+.. ::
+    Asci art reprepsentation of this diagram

Review Comment:
   ASCII



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965151108


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:

Review Comment:
   ```suggestion
   A dataset is defined by a Uniform Resource Identifier (URI):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965175687


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.

Review Comment:
   why: cus ... human readability is not necessarily a primary goal.  if this ultimately needs to line up with openlineage naming, then human readability may not be a goal at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965174145


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.

Review Comment:
   ```suggestion
   Airflow makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964998271


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   ```suggestion
   Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``example_consumer``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``example_consumer`` DAG will not be scheduled.
   ```
   
   Which dag id do people think is clearer? `example_consumer` or `requires_example_dataset`? (i.e. should we fix the prose or the code sample?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26208:
URL: https://github.com/apache/airflow/pull/26208#issuecomment-1239590045

   Good enough for now, we can improve this between beta and rc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965142160


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   I refactored this whole example. Take another look now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964955870


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -134,7 +134,7 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
 If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
 
 .. ::
-    Asci art reprepsentation of this diagram
+    ACII art representation of this diagram

Review Comment:
   🤦🏻 
   
   ```suggestion
       ASCII art representation of this diagram
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964998271


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   ```suggestion
   Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``example_consumer``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``example_consumer`` DAG will not be scheduled.
   ```
   
   Which dag id do people think is clearer? `example_consumer` or `requires_example_dataset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965183912


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is defined by a Uniform Resource Identifier (URI):
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are two restrictions on the dataset URI:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset("airflow://example_dataset")
+    not_ascii = Dataset("èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset("//example/dataset")
+    csv_file = Dataset("example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!
+
+The URI is also case sensitive throughout, so ``s3://example_dataset`` and ``s3://Example_Dataset`` are considered different, as is ``s3://example_dataset`` and ``S3://example_dataset``.
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='producer', ...):
+        BashOperator(task_id='producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='consumer', schedule=[example_dataset], ...):
+        ...
+
+Once the ``producer`` task in the ``producer`` DAG has completed successfully, Airflow schedules the ``consumer`` DAG. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``consumer`` DAG will not be scheduled.
+
+Multiple Datasets
+-----------------
+
+As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and the DAG will be scheduled once **all** datasets it consumes have been updated at least once since the last time it was run:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id='multiple_datasets_example',
+        schedule=[
+            example_dataset_1,
+            example_dataset_2,
+            example_dataset_3,
+        ],
+        ...,
+    ):
+        ...
+
+
+If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
+
+.. ::
+    ASCII art representation of this diagram
+
+    example_dataset_1   x----x---x---x----------------------x-
+    example_dataset_2   -------x---x-------x------x----x------
+    example_dataset_3   ---------------x-----x------x---------
+    DAG runs created                   *                    *
+
+.. graphviz::
+
+    graph dataset_event_timeline {
+      graph [layout=neato]
+      {
+        node [margin=0 fontcolor=blue width=0.1 shape=point label=""]
+        e1 [pos="1,2.5!"]
+        e2 [pos="2,2.5!"]
+        e3 [pos="2.5,2!"]
+        e4 [pos="4,2.5!"]
+        e5 [pos="5,2!"]
+        e6 [pos="6,2.5!"]
+        e7 [pos="7,1.5!"]
+        r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape]
+        e8 [pos="8,2!"]
+        e9 [pos="9,1.5!"]
+        e10 [pos="10,2!"]
+        e11 [pos="11,1.5!"]
+        e12 [pos="12,2!"]
+        e13 [pos="13,2.5!"]
+        r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape]
+      }
+      {
+        node [shape=none label="" width=0]
+        end_ds1 [pos="14,2.5!"]
+        end_ds2 [pos="14,2!"]
+        end_ds3 [pos="14,1.5!"]
+      }
+
+      {
+        node [shape=none margin=0.25  fontname="roboto,sans-serif"]
+        example_dataset_1 [ pos="-0.5,2.5!"]
+        example_dataset_2 [ pos="-0.5,2!"]
+        example_dataset_3 [ pos="-0.5,1.5!"]
+        dag_runs [label="DagRuns created" pos="-0.5,1!"]
+      }
+
+      edge [color=lightgrey]
+
+      example_dataset_1 -- e1 -- e2       -- e4       -- e6                                        -- e13 -- end_ds1
+      example_dataset_2             -- e3       -- e5             -- e8       -- e10        -- e12        -- end_ds2
+      example_dataset_3                                     -- e7       -- e9        -- e11               -- end_ds3
+
+    }
+
+Notes on schedules
+------------------
+
+The ``schedule`` parameter to your DAG can take either a list of datasets to consume or a timetable-based option. The two cannot currently be mixed.
+
+When using datasets, in this first release (v2.4) waiting for all datasets in the list to be updated is the only option when multiple datasets are consumed by a DAG. A later release should introduce more fine-grained options allowing for greater flexibility.

Review Comment:
   ```suggestion
   When using datasets, in this first release (v2.4) waiting for all datasets in the list to be updated is the only option when multiple datasets are consumed by a DAG. A later release may introduce more fine-grained options allowing for greater flexibility.
   ```
   
   also this might make more sense as a note in the previous section



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26208:
URL: https://github.com/apache/airflow/pull/26208#issuecomment-1239405583

   Dagrun sequence diagram looks like this now:
   
   ![image](https://user-images.githubusercontent.com/34150/188892820-9e6db0d4-1b27-452a-8193-3b4ca97412c3.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965183912


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is defined by a Uniform Resource Identifier (URI):
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are two restrictions on the dataset URI:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset("airflow://example_dataset")
+    not_ascii = Dataset("èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset("//example/dataset")
+    csv_file = Dataset("example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!
+
+The URI is also case sensitive throughout, so ``s3://example_dataset`` and ``s3://Example_Dataset`` are considered different, as is ``s3://example_dataset`` and ``S3://example_dataset``.
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='producer', ...):
+        BashOperator(task_id='producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='consumer', schedule=[example_dataset], ...):
+        ...
+
+Once the ``producer`` task in the ``producer`` DAG has completed successfully, Airflow schedules the ``consumer`` DAG. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``consumer`` DAG will not be scheduled.
+
+Multiple Datasets
+-----------------
+
+As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and the DAG will be scheduled once **all** datasets it consumes have been updated at least once since the last time it was run:
+
+.. code-block:: python
+
+    with DAG(
+        dag_id='multiple_datasets_example',
+        schedule=[
+            example_dataset_1,
+            example_dataset_2,
+            example_dataset_3,
+        ],
+        ...,
+    ):
+        ...
+
+
+If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
+
+.. ::
+    ASCII art representation of this diagram
+
+    example_dataset_1   x----x---x---x----------------------x-
+    example_dataset_2   -------x---x-------x------x----x------
+    example_dataset_3   ---------------x-----x------x---------
+    DAG runs created                   *                    *
+
+.. graphviz::
+
+    graph dataset_event_timeline {
+      graph [layout=neato]
+      {
+        node [margin=0 fontcolor=blue width=0.1 shape=point label=""]
+        e1 [pos="1,2.5!"]
+        e2 [pos="2,2.5!"]
+        e3 [pos="2.5,2!"]
+        e4 [pos="4,2.5!"]
+        e5 [pos="5,2!"]
+        e6 [pos="6,2.5!"]
+        e7 [pos="7,1.5!"]
+        r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape]
+        e8 [pos="8,2!"]
+        e9 [pos="9,1.5!"]
+        e10 [pos="10,2!"]
+        e11 [pos="11,1.5!"]
+        e12 [pos="12,2!"]
+        e13 [pos="13,2.5!"]
+        r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape]
+      }
+      {
+        node [shape=none label="" width=0]
+        end_ds1 [pos="14,2.5!"]
+        end_ds2 [pos="14,2!"]
+        end_ds3 [pos="14,1.5!"]
+      }
+
+      {
+        node [shape=none margin=0.25  fontname="roboto,sans-serif"]
+        example_dataset_1 [ pos="-0.5,2.5!"]
+        example_dataset_2 [ pos="-0.5,2!"]
+        example_dataset_3 [ pos="-0.5,1.5!"]
+        dag_runs [label="DagRuns created" pos="-0.5,1!"]
+      }
+
+      edge [color=lightgrey]
+
+      example_dataset_1 -- e1 -- e2       -- e4       -- e6                                        -- e13 -- end_ds1
+      example_dataset_2             -- e3       -- e5             -- e8       -- e10        -- e12        -- end_ds2
+      example_dataset_3                                     -- e7       -- e9        -- e11               -- end_ds3
+
+    }
+
+Notes on schedules
+------------------
+
+The ``schedule`` parameter to your DAG can take either a list of datasets to consume or a timetable-based option. The two cannot currently be mixed.
+
+When using datasets, in this first release (v2.4) waiting for all datasets in the list to be updated is the only option when multiple datasets are consumed by a DAG. A later release should introduce more fine-grained options allowing for greater flexibility.

Review Comment:
   ```suggestion
   When using datasets, in this first release (v2.4) waiting for all datasets in the list to be updated is the only option when multiple datasets are consumed by a DAG. A later release may introduce more fine-grained options allowing for greater flexibility.
   ```
   
   This might make more sense as a note in the previous section, where it's more in-context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965179609


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is defined by a Uniform Resource Identifier (URI):
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are two restrictions on the dataset URI:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset("airflow://example_dataset")
+    not_ascii = Dataset("èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset("//example/dataset")
+    csv_file = Dataset("example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!
+
+The URI is also case sensitive throughout, so ``s3://example_dataset`` and ``s3://Example_Dataset`` are considered different, as is ``s3://example_dataset`` and ``S3://example_dataset``.
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='producer', ...):
+        BashOperator(task_id='producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='consumer', schedule=[example_dataset], ...):
+        ...
+
+Once the ``producer`` task in the ``producer`` DAG has completed successfully, Airflow schedules the ``consumer`` DAG. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``consumer`` DAG will not be scheduled.

Review Comment:
   ```suggestion
   Once the ``producer`` task in the ``producer`` DAG has completed successfully, Airflow schedules the ``consumer`` DAG. A dataset will be marked as updated only if the task completes successfully — if the task fails or if it is skipped, no update occurs, and the ``consumer`` DAG will not be scheduled.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964929107


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -131,13 +131,61 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
         ...
 
 
-If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration::
+If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
+
+.. ::
+    Asci art reprepsentation of this diagram

Review Comment:
   Top notch nit picking :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964932423


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -131,13 +131,61 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
         ...
 
 
-If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration::
+If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
+
+.. ::
+    Asci art reprepsentation of this diagram

Review Comment:
   ```suggestion
       ACII art representation of this diagram
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965171198


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.

Review Comment:
   ```suggestion
   An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964955315


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -134,7 +134,7 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
 If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
 
 .. ::
-    Asci art reprepsentation of this diagram
+    ACII art representation of this diagram

Review Comment:
   Still ASCII 😁 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965017253


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!

Review Comment:
   ```suggestion
       Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!
   ```



##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   I think we can probably drop `example_` everywhere too. e.g. use `dag_id=update-dataset` and `task_id=producer`?



##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.

Review Comment:
   ```suggestion
   Airflow treats the dataset URI as an opaque value intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
   ```



##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:

Review Comment:
   ```suggestion
   There are three restrictions on the dataset URI:
   ```



##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).

Review Comment:
   I'm not sure the wording is right here? They don't have to be "unique", but casing needs to be consistent.
   
   ```suggestion
   3. The URI is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different.
   ```
   
   I'm not even sure this is a restriction though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965178315


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ):
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is defined by a Uniform Resource Identifier (URI):
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow makes no assumptions about the content or location of the data represented by the identifier. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.

Review Comment:
   ```suggestion
   Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964805278


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   I think `requires_example_dataset` should be `example_consumer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #26208:
URL: https://github.com/apache/airflow/pull/26208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org