You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "michaelmicheal (via GitHub)" <gi...@apache.org> on 2023/02/08 19:05:38 UTC

[GitHub] [airflow] michaelmicheal opened a new pull request, #29433: Add dataset update endpoint

michaelmicheal opened a new pull request, #29433:
URL: https://github.com/apache/airflow/pull/29433

   Closes: [#29162](https://github.com/apache/airflow/issues/29162)
   
   Allows for external dataset changes
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197375153


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)
+    timestamp = json_body.get("timestamp", datetime.now())
+    timestamp = timestamp.astimezone(timezone.utc)
+    extra = json_body.get("extra", {})
+    dataset_event = dataset_manager.register_external_dataset_change(
+        dataset=Dataset(uri),
+        external_source=external_source,
+        user_id=user_id,
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+
+    if dataset_event:

Review Comment:
   i wish it threw a helpful exception if there is no dataset instead of using None to signal that.  though i don't think there's anything we can do about that now, can we @uranusjr ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] michaelmicheal commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "michaelmicheal (via GitHub)" <gi...@apache.org>.
michaelmicheal commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1119322146


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +126,39 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   I followed the same pattern as dag_run. I think posting more than 1 dataset event at once may overcomplicate edge cases like a missing URI or a failure to register



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197383833


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr

Review Comment:
   i am not sure that remote_addr is the right choice here.  maybe we could record such information in the log table?  but to me it would seem it might be more useful to let this be an arbitrary text field?  although i suppose user can always supply information in the `extra` dict.... wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dimberman commented on pull request #29433: Add dataset update endpoint

Posted by "dimberman (via GitHub)" <gi...@apache.org>.
dimberman commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1446856635

   LGTM once @bolkedebruin 's comments are addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197380525


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)
+    timestamp = json_body.get("timestamp", datetime.now())
+    timestamp = timestamp.astimezone(timezone.utc)
+    extra = json_body.get("extra", {})
+    dataset_event = dataset_manager.register_external_dataset_change(
+        dataset=Dataset(uri),
+        external_source=external_source,
+        user_id=user_id,
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+
+    if dataset_event:

Review Comment:
   since the params are kwargs-only, i reckon we could make task_instance optional.  
   
   and, thankfully, since it accepts `**kwargs`, adding more params at call site in airflow won't break anything for "old" custom dataset managers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #29433: Add dataset update endpoint

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1619283233

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197341988


##########
tests/api_connexion/endpoints/test_dataset_endpoint.py:
##########
@@ -598,3 +606,45 @@ def test_should_return_conf_max_if_req_max_above_conf(self, session):
 
         assert response.status_code == 200
         assert len(response.json["dataset_events"]) == 150
+
+
+class TestPostDatasetEvents(TestDatasetEndpoint):
+    def test_should_respond_200(self, session):
+        self._create_dataset(session)
+
+        response = self.client.post(
+            "/api/v1/datasets/events",
+            json={"dataset_uri": "s3://bucket/key", "timestamp": self.default_time, "extra": {"foo": "bar"}},
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 200
+        assert response.json == {
+            "id": ANY,
+            "dataset_id": 1,
+            "dataset_uri": "s3://bucket/key",
+            "external_source": ANY,
+            "extra": {"foo": "bar"},
+            "source_dag_id": None,
+            "source_task_id": None,
+            "source_run_id": None,
+            "source_map_index": -1,
+            "user_id": ANY,
+            "timestamp": self.default_time,
+        }
+
+    def test_should_raises_401_unauthenticated(self, session):

Review Comment:
   ```suggestion
       def test_should_raise_401_unauthenticated(self, session):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197348367


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   Yeah good point, this should probably just be a fk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197376110


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)
+    timestamp = json_body.get("timestamp", datetime.now())
+    timestamp = timestamp.astimezone(timezone.utc)
+    extra = json_body.get("extra", {})
+    dataset_event = dataset_manager.register_external_dataset_change(
+        dataset=Dataset(uri),
+        external_source=external_source,
+        user_id=user_id,
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+
+    if dataset_event:

Review Comment:
   oh wait... this is a new method.  so we could.  but wait do we even need a new method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1101047172


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1919,6 +1919,32 @@ paths:
           $ref: '#/components/responses/PermissionDenied'
         '404':
           $ref: '#/components/responses/NotFound'
+    post:
+      summary: Post dataset event
+      description: Post dataset events
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: post_dataset_event
+      tags: [ Dataset ]

Review Comment:
   ```suggestion
         tags: [Dataset]
   ```



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +128,29 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create connection entry."""

Review Comment:
   ```suggestion
       """Create a dataset event"""
   ```



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1919,6 +1919,32 @@ paths:
           $ref: '#/components/responses/PermissionDenied'
         '404':
           $ref: '#/components/responses/NotFound'
+    post:
+      summary: Post dataset event
+      description: Post dataset events

Review Comment:
   ```suggestion
         description: Post dataset event
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197348367


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   Yeah good point, this should probably just be a fk.
   
   ---
   
   Edit: Using an fk has problems when a user is deleted though. We probably don’t want to lose the triggering history in the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197377059


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)
+    timestamp = json_body.get("timestamp", datetime.now())
+    timestamp = timestamp.astimezone(timezone.utc)
+    extra = json_body.get("extra", {})
+    dataset_event = dataset_manager.register_external_dataset_change(
+        dataset=Dataset(uri),
+        external_source=external_source,
+        user_id=user_id,
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+
+    if dataset_event:

Review Comment:
   @michaelmicheal why do we need a new method for this?  could we not add params to `register_external_dataset_change`?



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)
+    timestamp = json_body.get("timestamp", datetime.now())
+    timestamp = timestamp.astimezone(timezone.utc)
+    extra = json_body.get("extra", {})
+    dataset_event = dataset_manager.register_external_dataset_change(
+        dataset=Dataset(uri),
+        external_source=external_source,
+        user_id=user_id,
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+
+    if dataset_event:

Review Comment:
   @michaelmicheal why do we need a new method for this?  could we not add params to `register_dataset_change`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dimberman commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dimberman (via GitHub)" <gi...@apache.org>.
dimberman commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1104614103


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +128,29 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create a dataset event"""

Review Comment:
   Can you please add a more detailed description of what this function is doing



##########
airflow/datasets/manager.py:
##########
@@ -55,23 +55,31 @@ def register_dataset_change(
         dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
-            return
-        session.add(
-            DatasetEvent(
+            return None
+
+        if task_instance:
+            dataset_event = DatasetEvent(
                 dataset_id=dataset_model.id,
                 source_task_id=task_instance.task_id,
                 source_dag_id=task_instance.dag_id,
                 source_run_id=task_instance.run_id,
                 source_map_index=task_instance.map_index,
                 extra=extra,
             )
-        )
+        else:
+            dataset_event = DatasetEvent(
+                dataset_id=dataset_model.id,
+                extra=extra,
+            )
+        session.add(dataset_event)

Review Comment:
   Please add a comment here explaining why we would create a dataset without a task instance. I think that would make it easier for folks reading this code in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1118401275


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +126,39 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = json_body["external_source"]
+    external_service_id = json_body["external_service_id"]

Review Comment:
   same as above: this should not be up to the client to decide, but should be obtained at a different layer (HTTP, Application, etc)



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +126,39 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = json_body["external_source"]

Review Comment:
   I don't think we should leave this open to the client, we should get this from the request, e.g. REMOTE_ADDR or registered api_client. Allowing this to be set by the clients puts us at a security risk as its integrity can't be verified anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] michaelmicheal commented on pull request #29433: Add dataset update endpoint

Posted by "michaelmicheal (via GitHub)" <gi...@apache.org>.
michaelmicheal commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1462140546

   > For example, I expect the majority of usage to come from cloud storage integration. S3 (+Minio), GCS, ABS all use their own callback schema, which we ideally allow providers to register these kind of callbacks. The question becomes how to 'detect' with what service we are integrating without creating a lot of work for ourselves by needing to expose every flavor of callback as a separate API.
   
   In my mind this PR simply solidifies Dataset integration capabilities
   The point you raise about detecting what service we are integrating with is a valid question, and IMO warrant it's own issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197303230


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -16,20 +16,30 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
+from flask import request
+from flask_login import current_user
+from marshmallow import ValidationError
 from sqlalchemy import func
 from sqlalchemy.orm import Session, joinedload, subqueryload
 
+from airflow import Dataset

Review Comment:
   ```suggestion
   from airflow.datasets import Dataset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197391932


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud... 
   
   Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but...
   
   So, airflow provides a mechanism to deactivate users.  Interestingly, on the user edit form, it even says `don't delete users; it's a bad practice; just deactivate them`.
   
   <img width="829" alt="image" src="https://github.com/apache/airflow/assets/15932138/bcff2dd8-0c8e-47c0-b4fe-de8d0493290c">
   
   Additionally a username can be changed.  So I could take some action, change my username, and now you don't know that I took that action.  
   
   Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user".
   
   I think your point that having a username is more useful than having a null is obviously true.  But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them.  But presuming they don't you have the benefits of referential integrity.
   
   Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID.  
   
   Another option would be to _reject_ the deletion of a user when there are associated log or dataset events. That would seem reasonable too.
   
   So yeah I think i've convinced myself a bit more that using the ID is the right way.   I think that the mutability of username is a strong argument in light of security / auditing concerns.  But lmkwyt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197391932


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud... 
   
   Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but...
   
   So, airflow provides a mechanism to deactivate users.  Interestingly, on the user edit form, it even says `don't delete users; it's a bad practice; just deactivate them`.
   
   <img width="829" alt="image" src="https://github.com/apache/airflow/assets/15932138/bcff2dd8-0c8e-47c0-b4fe-de8d0493290c">
   
   Additionally a username can be changed.  So I could take some action, change my username, and now you don't know that I took that action.  
   
   Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user".
   
   I think your point that having a username is more useful than having a null is obviously true.  But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them.  But presuming they don't you have the benefits of referential integrity.
   
   Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID.  
   
   Another option would be to _reject_ the deletion of a user when there are associated log or dataset events. That would seem reasonable too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] DjVinnii commented on pull request #29433: Add dataset update endpoint

Posted by "DjVinnii (via GitHub)" <gi...@apache.org>.
DjVinnii commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1552578939

   > Yeah it's a good idea @DjVinnii, perhaps you'd be interested in contributing that? although, if it does not exist, then nothing on the cluster is using it, so it would not really have any effect.
   
   Our use case is to be able to synchronize datasets between multiple Airflow instances so that consumers only have to know the dataset name and they don't have to be aware in that instance the producer dag is. At the moment we are creating and updating datasets across Airflow instances by using a hacky dummy dag that produces the dataset in other instances, but an API seems way more robust.
   
   I'm willing to give this a try and contribute.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1552401507

   > This feature looks like something we can use. I had a quick look and one thing came to mind:
   > 
   > It looks like it is not possible to create a remote dataset event if the dataset does not exist already. Is that correct? I my opinion, it would then also be nice if we can create a dataset by using the API.
   
   Yeah it's a good idea @DjVinnii, perhaps you'd be interested in contributing that? although, if it does not exist, then nothing on the cluster is using it, so it would not really have any effect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197341106


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   isn't the id how a typical FK->PK relationship is defined?
   it seems appropriate to add user_id and make a FK to users table.  then one could always look up the username by joining?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1198276430


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   brought this up on slack.  the suggestion is to not add user column at all right now since user handling will change with AIP-56.  and for auditing purposes you can add a record to the Log table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] michaelmicheal commented on pull request #29433: Add dataset update endpoint

Posted by "michaelmicheal (via GitHub)" <gi...@apache.org>.
michaelmicheal commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1424297531

   > I'm not sure of this, like the broadcasted event has no source dag/task etc. cc: @dstandish
   
   There's a few reasons why I think it's super important to at least support (not necessarily encourage) external dataset changes. 
   
   1. Integrate with external services and non-Airflow components of a pipeline. If a data science team has an external component of an ETL pipeline (for example data warehouse ingestion), these external services should be able to trigger workflows that depend on datasets when updated externally.
   2. Support multi-instance Airlfow architectures. With astro, cloud composer, and custom solutions (like us at Shopify), using multiple Airflow instance in production is very common. When one layer of the data platform is orchestrated in one instance, and another layer is orchestrated in a different instance, we rely on being able to broadcast dataset changes between Airflow instances. We need this integration to be able to pass dataset changes between Airflow instances through the API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1133576482


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -143,7 +145,7 @@ def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
     uri = json_body["dataset_uri"]
     external_source = request.remote_addr
     user_id = getattr(current_user, "id", None)
-    timestamp = json_body["timestamp"]
+    timestamp = json_body.get("timestamp").astimezone(timezone.utc)

Review Comment:
   This should use `[]` instead of `get()` unless you provide a non-None default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197371171


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   Setting this to a (non-fk) username would be much more useful than having a null IMO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] marclamberti commented on pull request #29433: Add dataset update endpoint

Posted by "marclamberti (via GitHub)" <gi...@apache.org>.
marclamberti commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1642348710

   Is this feature going to be merged for 2.7?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #29433: Add dataset update endpoint

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #29433: Add dataset update endpoint
URL: https://github.com/apache/airflow/pull/29433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on pull request #29433: Add dataset update endpoint

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1445912344

   So, I like where this is going, but I'd like some extra robustness / proper security (see above). Furthermore, we need to to think how this API will be used. 
   
   For example, I expect the majority of usage to come from cloud storage integration. S3 (+Minio), GCS, ABS all use their own callback schema, which we ideally allow providers to register these kind of callbacks. The question becomes how to 'detect' with what service we are integrating without creating a lot of work for ourselves by needing to expose every flavor of callback as a separate API. I quite understand that this is beyond the scope of your PR, but it gives a dot on the horizon so to say. 
   
   I *think* with the security concerns addressed and unit tests added it looks mergeable. I'm a bit concerned around the schema and schema evolution. How's that going to work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1118421306


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +126,39 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   Why can we only parse 1 event at the time? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] michaelmicheal commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "michaelmicheal (via GitHub)" <gi...@apache.org>.
michaelmicheal commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1113218259


##########
airflow/datasets/manager.py:
##########
@@ -55,23 +61,33 @@ def register_dataset_change(
         dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
-            return
-        session.add(
-            DatasetEvent(
+            return None
+
+        if task_instance:
+            dataset_event = DatasetEvent(
                 dataset_id=dataset_model.id,
                 source_task_id=task_instance.task_id,
                 source_dag_id=task_instance.dag_id,
                 source_run_id=task_instance.run_id,
                 source_map_index=task_instance.map_index,
                 extra=extra,
             )
-        )
+        else:
+            # When an external dataset change is made through the API, it isn't triggered by a task instance,
+            # so we create a DatasetEvent without the task and dag data.
+            dataset_event = DatasetEvent(

Review Comment:
   What do you think of the latest changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1117899923


##########
airflow/models/dataset.py:
##########
@@ -275,6 +275,8 @@ class DatasetEvent(Base):
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
     source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    external_service_id = Column(StringID(), nullable=True)
+    external_source = Column(StringID(), nullable=True)

Review Comment:
   How are these different and why can they not be combined into one field? Also, what do they mean? I can not find expainations anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1133577490


##########
airflow/models/dataset.py:
##########
@@ -275,7 +275,7 @@ class DatasetEvent(Base):
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
     source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
-    external_service_id = Column(StringID(), nullable=True)
+    external_source = Column(StringID(), nullable=True)

Review Comment:
   Why change this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197303230


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -16,20 +16,30 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
+from flask import request
+from flask_login import current_user
+from marshmallow import ValidationError
 from sqlalchemy import func
 from sqlalchemy.orm import Session, joinedload, subqueryload
 
+from airflow import Dataset

Review Comment:
   This should import from `airflow.datasets` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197368654


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   i think there's a mechanism to set to null when user deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197391932


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud... 
   
   Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but...
   
   So, airflow provides a mechanism to deactivate users.  Interestingly, on the user edit form, it even says `don't delete users; it's a bad practice; just deactivate them`.
   
   <img width="829" alt="image" src="https://github.com/apache/airflow/assets/15932138/bcff2dd8-0c8e-47c0-b4fe-de8d0493290c">
   
   Additionally a username can be changed.  So I could take some action, change my username, and now you don't know that I took that action.  
   
   Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user".
   
   I think your point that having a username is more useful than having a null is obviously true.  But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them.  But presuming they don't you have the benefits of referential integrity.
   
   Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID.  
   
   Another option would be to _reject_ the deletion of a user when there are associated log or dataset events. That would seem reasonable too.
   
   So yeah I think i've convinced myself a bit more that using the ID is the right way.  I think that the mutability of username is a strong argument in light of security / auditing concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1198099960


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   I guess the other reason @uranusjr would be so that we can use standard ORM features such as relationships to get from user to dataset and vice versa but... i suppose you could say that you could still do so with username via custom join conditions 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1198099960


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   I guess the other reason @uranusjr would be so that we can use standard ORM features such as relationships to get from user to dataset and vice versa but... i suppose you could say that you could still do so with custom join conditions 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1118424212


##########
airflow/models/dataset.py:
##########
@@ -275,6 +275,8 @@ class DatasetEvent(Base):
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
     source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    external_service_id = Column(StringID(), nullable=True)
+    external_source = Column(StringID(), nullable=True)

Review Comment:
   As per earlier comment from me @uranusjr :
   
   * by whom - external_auth_id or external_service_id -> required
   * from where (api, client_ip / remote_addr) - external_source -> required
   
   So they cannot be combined as they contain different information and are both required.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1117899923


##########
airflow/models/dataset.py:
##########
@@ -275,6 +275,8 @@ class DatasetEvent(Base):
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
     source_map_index = Column(Integer, nullable=True, server_default=text("-1"))
+    external_service_id = Column(StringID(), nullable=True)
+    external_source = Column(StringID(), nullable=True)

Review Comment:
   How are these different and why can they not be combined into one field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1111675033


##########
airflow/datasets/manager.py:
##########
@@ -55,23 +61,33 @@ def register_dataset_change(
         dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
-            return
-        session.add(
-            DatasetEvent(
+            return None
+
+        if task_instance:
+            dataset_event = DatasetEvent(
                 dataset_id=dataset_model.id,
                 source_task_id=task_instance.task_id,
                 source_dag_id=task_instance.dag_id,
                 source_run_id=task_instance.run_id,
                 source_map_index=task_instance.map_index,
                 extra=extra,
             )
-        )
+        else:
+            # When an external dataset change is made through the API, it isn't triggered by a task instance,
+            # so we create a DatasetEvent without the task and dag data.
+            dataset_event = DatasetEvent(

Review Comment:
   It would be great to have extra information available when the dataset has externally changed such as:
   
   * by whom - `external_auth_id` or `external_service_id` -> required
   * from where (api, client_ip / remote_addr) - `external_source` -> required
   * the timestamp of the actual event - so it can be reconciled if required -> Nullable as it might not be available
   
   This ensures lineage isn't broken across systems



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] michaelmicheal commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "michaelmicheal (via GitHub)" <gi...@apache.org>.
michaelmicheal commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1113218259


##########
airflow/datasets/manager.py:
##########
@@ -55,23 +61,33 @@ def register_dataset_change(
         dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
-            return
-        session.add(
-            DatasetEvent(
+            return None
+
+        if task_instance:
+            dataset_event = DatasetEvent(
                 dataset_id=dataset_model.id,
                 source_task_id=task_instance.task_id,
                 source_dag_id=task_instance.dag_id,
                 source_run_id=task_instance.run_id,
                 source_map_index=task_instance.map_index,
                 extra=extra,
             )
-        )
+        else:
+            # When an external dataset change is made through the API, it isn't triggered by a task instance,
+            # so we create a DatasetEvent without the task and dag data.
+            dataset_event = DatasetEvent(

Review Comment:
   What do you think of the latest changes @bolkedebruin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] DjVinnii commented on pull request #29433: Add dataset update endpoint

Posted by "DjVinnii (via GitHub)" <gi...@apache.org>.
DjVinnii commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1549165101

   This feature looks like something we can use. I had a quick look and one thing came to mind:
   
   It looks like it is not possible to create a remote dataset event if the dataset does not exist already. Is that correct? I my opinion, it would then also be nice if we can create a dataset by using the API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197304770


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   Instead of ID, I feel this should use the username. The ID from database should be considered kind of an implementation detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #29433: Add dataset update endpoint

Posted by "dstandish (via GitHub)" <gi...@apache.org>.
dstandish commented on code in PR #29433:
URL: https://github.com/apache/airflow/pull/29433#discussion_r1197368157


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -120,3 +130,40 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, total_entries=total_entries)
     )
+
+
+@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET)])
+@provide_session
+def post_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create an external dataset event. This endpoint is useful if you want to update a dataset and
+    trigger downstream DAG runs from external services.
+    """
+    try:
+        json_body = dataset_change_schema.load(get_json_request_dict())
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    uri = json_body["dataset_uri"]
+    external_source = request.remote_addr
+    user_id = getattr(current_user, "id", None)

Review Comment:
   true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29433: Add dataset update endpoint

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29433:
URL: https://github.com/apache/airflow/pull/29433#issuecomment-1647142651

   This is not finished and cannot be merged. If you are interested in the feature, please open a new pull request and work on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org