You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/07 12:55:29 UTC

[GitHub] [airflow] norm commented on a diff in pull request #26208: Update docs for data-aware scheduling (AKA "datasets"/AIP-48)

norm commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r964805278


##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,137 @@
     specific language governing permissions and limitations
     under the License.
 
-Datasets
-========
+Data-aware scheduling
+=====================
 
 .. versionadded:: 2.4
 
-With datasets, instead of running a DAG on a schedule, a DAG can be configured to run when a dataset has been updated.
+Quickstart
+----------
 
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled based upon another DAG updating a dataset.
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dataset_def]
-    :end-before: [END dataset_def]
+.. code-block:: python
 
-Then reference the dataset as a task outlet:
+    from airflow import Dataset
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :dedent: 4
-    :start-after: [START task_outlet]
-    :end-before: [END task_outlet]
+    with DAG(...):
+        task1 = MyOperator(
+            # this task updates example.csv
+            outlets=[Dataset("s3://dataset-bucket/example.csv")],
+            ...,
+        )
 
-Finally, define a DAG and reference this dataset in the DAG's ``schedule`` argument:
 
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
-    :language: python
-    :start-after: [START dag_dep]
-    :end-before: [END dag_dep]
+    with DAG(
+        # this DAG should be run when example.csv is updated (by dag1)
+        schedule=[Dataset("s3://dataset-bucket/example.csv")],
+        ...,
+    ) as dag2:
+        Task2 = OtherOperator(...)
+        ...
 
-You can reference multiple datasets in the DAG's ``schedule`` argument.  Once there has been an update to all of the upstream datasets, the DAG will be triggered.  This means that the DAG will run as frequently as its least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data that flows through multiple DAGs, possibly being changed or updated by each one. Datasets are updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
+
+A dataset is a construct around a Uniform Resource Identifier (URI) that you create in your code:
+
+.. code-block:: python
+
+    from airflow import Dataset
+
+    example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI identifiers as opaque values intended to be human-readable, and makes no assumptions about the content or location of the data represented by the identifier. They are treated as strings, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
+
+There are three restrictions on the dataset identifier:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+3. It must be unique (although it is case is sensitive throughout, so "s3://example_dataset" and "s3://Example_Dataset" are considered different, and "s3://example_dataset" and "S3://example_dataset" are considered different).
+
+If you try to use either of the examples below, your code will cause a ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+    # invalid datasets:
+    reserved = Dataset(uri="airflow://example_dataset")
+    not_ascii = Dataset(uri="èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less, relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+    # valid datasets:
+    schemeless = Dataset(uri="//example/dataset")
+    csv_file = Dataset(uri="example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+    example_dataset = Dataset(
+        "s3://dataset/example.csv",
+        extra={'team': 'trainees'},
+    )
+
+..note::
+
+    Security Note: Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially URL server credentials, in dataset URIs or extra key values!
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the following example:
+
+.. code-block:: python
+
+    example_dataset = Dataset("s3://dataset/example.csv")
+
+    with DAG(dag_id='update_example_dataset', ...) as update_example_dataset:
+        BashOperator(task_id='example_producer', outlets=[example_dataset], ...)
+
+    with DAG(dag_id='example_consumer', schedule=[example_dataset], ...):
+        BashOperator(...)
+
+Once the ``example_producer`` task of the first ``update_example_dataset`` DAG has completed successfully, Airflow schedules ``requires_example_dataset``. Only a task's success triggers dataset updates — if the task fails or if it raises an :class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the ``requires_example_dataset`` DAG will not be scheduled.

Review Comment:
   I think `requires_example_dataset` should be `example_consumer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org