You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/04 08:50:30 UTC

[GitHub] [airflow] ashb commented on a change in pull request #17405: Put the XCom.clear logic into XCom class to allow it hookable in custom XCom backend

ashb commented on a change in pull request #17405:
URL: https://github.com/apache/airflow/pull/17405#discussion_r682418848



##########
File path: airflow/models/xcom.py
##########
@@ -224,6 +224,52 @@ def delete(cls, xcoms, session=None):
             session.delete(xcom)
         session.commit()
 
+    @classmethod
+    @provide_session
+    def clear(
+        cls,
+        execution_date: pendulum.DateTime,
+        dag_id: Optional[str] = None,
+        task_id: Optional[str] = None,
+        include_prior_dates: bool = False,

Review comment:
       Why did you add this flag?

##########
File path: airflow/models/xcom.py
##########
@@ -224,6 +224,52 @@ def delete(cls, xcoms, session=None):
             session.delete(xcom)
         session.commit()
 
+    @classmethod
+    @provide_session
+    def clear(
+        cls,
+        execution_date: pendulum.DateTime,
+        dag_id: Optional[str] = None,
+        task_id: Optional[str] = None,

Review comment:
       Why are these optional?

##########
File path: airflow/models/xcom.py
##########
@@ -224,6 +224,52 @@ def delete(cls, xcoms, session=None):
             session.delete(xcom)
         session.commit()
 
+    @classmethod
+    @provide_session
+    def clear(
+        cls,
+        execution_date: pendulum.DateTime,
+        dag_id: Optional[str] = None,
+        task_id: Optional[str] = None,
+        include_prior_dates: bool = False,
+        session: Session = None,
+    ) -> None:
+        """
+        Clears all XCom data from the database for the task instance
+
+        :param execution_date: Execution date for the task
+        :type execution_date: pendulum.datetime
+        :param dag_id: If provided, only pulls XCom from this DAG.
+            If None (default), the DAG of the calling task is used.
+        :type dag_id: str
+        :param task_id: Only XComs from task with matching id will be
+            pulled. Can pass None to remove the filter.
+        :type task_id: str
+        :param include_prior_dates: If False, only XComs from the current
+            execution_date are returned. If True, XComs from previous dates
+            are returned as well.
+        :type include_prior_dates: bool
+        :param session: database session
+        :type session: sqlalchemy.orm.session.Session
+        """
+        filters = []
+
+        if dag_id:
+            filters.append(cls.dag_id == dag_id)
+
+        if task_id:
+            filters.append(cls.task_id == task_id)
+
+        if include_prior_dates:
+            filters.append(cls.execution_date <= execution_date)
+        else:
+            filters.append(cls.execution_date == execution_date)
+
+        session.query(cls).filter(
+            and_(*filters)
+        ).delete()
+        session.commit()

Review comment:
       https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#database-session-handling
   
   ```suggestion
   ```
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org