You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/12 23:11:48 UTC

[GitHub] [airflow] dstandish opened a new pull request #20838: Add `maintenance cleanup` CLI command for purging old data

dstandish opened a new pull request #20838:
URL: https://github.com/apache/airflow/pull/20838


   Must supply "purge before date".
   Can optionally provide table list.
   Dry run will only print the number of rows meeting criteria.
   If not dry run, will require the user to confirm before deleting.
   
   *note* draft. and still need to add tests 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783523781



##########
File path: airflow/cli/cli_parser.py
##########
@@ -1054,6 +1080,14 @@ class GroupCommand(NamedTuple):
         args=(ARG_CLEAR_ONLY,),
     ),
 )
+MAINTENANCE_COMMANDS = (
+    ActionCommand(
+        name='cleanup',

Review comment:
       In the future, we can add other commands that will clean other things, e.g. logs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1040527175


   > Static checks :) ?
   
   sorry should be fixed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811540050



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print(f"DB: {settings.engine.url!r}")
+    db.upgradedb()

Review comment:
       I don't think this is even used anywhere, let's remove this function




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] rickshapiroBetter commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
rickshapiroBetter commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r803008591



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__

Review comment:
       shouldn't this reference `other` arg instead of `self` twice?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1048341688


   Lgtm, some docs for this would be nice


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811020543



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))

Review comment:
       ```suggestion
       logger.debug("print entities query: %s", query)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r812428381



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print(f"DB: {settings.engine.url!r}")
+    db.upgradedb()

Review comment:
       yup removed thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1011937600


   Love this @dstandish :heart: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783672138



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {

Review comment:
       These structured data should use a more structured type than dict-of-dicts. A dict of namedtuples instead would be a big improvement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783669005



##########
File path: airflow/cli/commands/maintenance_command.py
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Maintenance sub-commands"""
+from airflow.utils.metastore_cleanup import run_cleanup
+
+
+def cleanup(args):
+    """Purges old records in metastore database"""
+    print(args.dry_run)
+    kwargs = dict(
+        dry_run=args.dry_run, clean_before_timestamp=args.clean_before_timestamp, verbose=args.verbose
+    )
+    if args.tables:
+        kwargs.update(
+            table_names=args.tables,
+        )
+    run_cleanup(**kwargs)

Review comment:
       Since you do `if table_name` in `run_cleanup`, this is equivalent to
   
   ```python
   run_cleanup(
       dry_run=args.dry_run,
       clean_before_timestamp=args.clean_before_timestamp,
       verbose=args.verbose,
       table_names=args.tables,
   )
   ```
   
   With appropriate docstring in `run_cleanup` (to clarify that passing an empty list is the same as not passing the argument), this would be more readable and maintainable that the current implementation IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783539524



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {

Review comment:
       Is there any reason to expire old Import Errors?
   
   They get removed constantly by the File Processor as files are re-processed:
   https://github.com/apache/airflow/blob/3ccb79423e8966305bb762200b53134dd2b349ec/airflow/dag_processing/processor.py#L539-L543
   
   And they get removed when the related file no longer exists:
   https://github.com/apache/airflow/blob/3ccb79423e8966305bb762200b53134dd2b349ec/airflow/dag_processing/processor.py#L539-L543
   
   So I wouldn't expect there to be any dangling records here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] PApostol commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
PApostol commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784108849



##########
File path: airflow/cli/cli_parser.py
##########
@@ -162,6 +162,11 @@ def _check(value):
     return _check
 
 
+def string_list_type(val):
+    """Splits comma-separated list and returns the lists (strips whitespace)"""

Review comment:
       Perhaps you mean "Splits comma-separated _string_ and returns the result _in a list_"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783520216



##########
File path: airflow/cli/cli_parser.py
##########
@@ -378,6 +383,27 @@ def _check(value):
 ARG_CONF = Arg(('-c', '--conf'), help="JSON string that gets pickled into the DagRun's conf attribute")
 ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
 
+# maintenance
+ARG_MAINTENANCE_TABLES = Arg(
+    ("-t", "--tables"),
+    help="Table names to perform maintenance on (use comma-separated list)",

Review comment:
       Can you add a list of accepted values?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807327511



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       https://github.com/apache/airflow/pull/20838/files#r807326376 will affect the module name. how about `airflow/utils/db_cleanup.py`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807326376



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print(f"DB: {settings.engine.url!r}")
+    db.upgradedb()
+
+
+# lazily imported by CLI parser for `help` command
+all_tables = sorted(config_dict)
+
+
+@cli_utils.action_cli(check_db=False)
+def cleanup_tables(args):
+    """Purges old records in metastore database"""

Review comment:
       nit: There are different versions of same name "metadata database" (L102), "metastore database" (L113) and "metastore tables" (L1324 of `airflow/cli/cli_parser.py`) -- would be good to unify name "metadata database" and "tables in metadata DB" etc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811538584



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__

Review comment:
       ```suggestion
           return self.orm_model.__tablename__ < other.orm_model.__tablename__
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811539251



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]

Review comment:
       Would be worth double-checking if there were new ORM Models added while this PR was in review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r803044621



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__

Review comment:
       > shouldn't this reference `other` arg instead of `self` twice?
   
   ha, yup!
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783734889



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {

Review comment:
       There are other cases where files are renamed or deleted when they have import error. These are also handled by 
   
   https://github.com/apache/airflow/blob/3ccb79423e8966305bb762200b53134dd2b349ec/airflow/dag_processing/manager.py#L696-L708
   
   But I still feel there is no harm in keeping this code here as it keeps the cleanup of all tables consistent here too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1040511386


   Static checks :) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807331051



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))
+    for entry in query.limit(max_rows_to_print):
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41

Review comment:
       Can we add summary of that issue here, as this URL can become 404 if repo is deleted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1012518905


   > We **just** had discussion that we need it yesterday with @mik-laj and @mhenc :) 
   
   We have got more stuffed planned to make Airflow Upgrade more reliable - stay tuned
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r812535925



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]

Review comment:
       good call. i also added a test to make sure that new ORMs are either added to the list, or added to an exclusion list




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r796279528



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__

Review comment:
       Probably easier to understand if this logic is put into `sorted(..., key=...)` instead. This class does not need to be generally sortable (and the sorting logic isn’t obvious either).

##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)

Review comment:
       ```suggestion
       logging.debug("print entities query: %s", query)
       for entry in query.limit(max_rows_to_print):  # type: Log
           print(entry.__dict__)
   ```
   
   Also, the first line logs the query without `limit`, and is using the root logger (instead of a namespaced one). Is this intended?

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print("DB: " + repr(settings.engine.url))
+    db.upgradedb()
+
+
+# lazily imported by CLI parser for `help` command
+all_tables = list(sorted(config_dict.keys()))

Review comment:
       ```suggestion
   all_tables = sorted(config_dict)
   ```

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print("DB: " + repr(settings.engine.url))

Review comment:
       ```suggestion
       print(f"DB: {settings.engine.url!r}")
   ```

##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(recency_column))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def _build_query(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    session,
+    **kwargs,
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = _subquery_keep_last(
+            recency_column=recency_column,
+            keep_last_filters=keep_last_filters,
+            keep_last_group_by=keep_last_group_by,
+            session=session,
+        )
+        print(subquery.all())
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger(__file__)
+
+
+def _cleanup_table(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    dry_run=True,
+    verbose=False,
+    session=None,
+    **kwargs,
+):
+    print()
+    if dry_run:
+        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+    query = _build_query(
+        orm_model=orm_model,
+        recency_column=recency_column,
+        keep_last=keep_last,
+        keep_last_filters=keep_last_filters,
+        keep_last_group_by=keep_last_group_by,
+        clean_before_timestamp=clean_before_timestamp,
+        session=session,
+    )
+
+    _print_entities(query=query, print_rows=False)
+
+    if not dry_run:
+        _do_delete(query=query, session=session)
+        session.commit()
+
+
+def _confirm_delete(*, date: DateTime, tables: List[str]):
+    for_tables = f" for tables {tables!r}" if tables else ''
+    question = '\n'.join(
+        [
+            f"You have requested that we purge all data prior to {date}{for_tables}.",
+            "This is irreversible.  Consider backing up the tables first and / or doing a dry run"
+            " with option --dry-run.",
+            "Enter 'delete rows' to proceed.",
+        ]
+    )
+    print(question)
+    answer = input().strip()
+    if not answer == 'delete rows':
+        raise SystemExit("User did not confirm; exiting.")
+
+
+def _print_config(*, configs: Dict[str, _TableConfig]):
+    data = [x.readable_config for x in configs.values()]
+    AirflowConsole().print_as_table(data=data)
+
+
+class _warn_if_missing(AbstractContextManager):
+    def __init__(self, table, suppress):
+        self.table = table
+        self.suppress = suppress
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excinst, exctb):
+        caught_error = exctype is not None and issubclass(exctype, OperationalError)
+        if caught_error:
+            logger = logging.getLogger()

Review comment:
       What is this do?

##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(recency_column))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def _build_query(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    session,
+    **kwargs,
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = _subquery_keep_last(
+            recency_column=recency_column,
+            keep_last_filters=keep_last_filters,
+            keep_last_group_by=keep_last_group_by,
+            session=session,
+        )
+        print(subquery.all())
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger(__file__)
+
+
+def _cleanup_table(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    dry_run=True,
+    verbose=False,
+    session=None,
+    **kwargs,
+):
+    print()
+    if dry_run:
+        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+    query = _build_query(
+        orm_model=orm_model,
+        recency_column=recency_column,
+        keep_last=keep_last,
+        keep_last_filters=keep_last_filters,
+        keep_last_group_by=keep_last_group_by,
+        clean_before_timestamp=clean_before_timestamp,
+        session=session,
+    )
+
+    _print_entities(query=query, print_rows=False)
+
+    if not dry_run:
+        _do_delete(query=query, session=session)
+        session.commit()
+
+
+def _confirm_delete(*, date: DateTime, tables: List[str]):
+    for_tables = f" for tables {tables!r}" if tables else ''
+    question = '\n'.join(
+        [
+            f"You have requested that we purge all data prior to {date}{for_tables}.",
+            "This is irreversible.  Consider backing up the tables first and / or doing a dry run"
+            " with option --dry-run.",
+            "Enter 'delete rows' to proceed.",
+        ]
+    )
+    print(question)
+    answer = input().strip()
+    if not answer == 'delete rows':
+        raise SystemExit("User did not confirm; exiting.")
+
+
+def _print_config(*, configs: Dict[str, _TableConfig]):
+    data = [x.readable_config for x in configs.values()]
+    AirflowConsole().print_as_table(data=data)
+
+
+class _warn_if_missing(AbstractContextManager):
+    def __init__(self, table, suppress):
+        self.table = table
+        self.suppress = suppress
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excinst, exctb):
+        caught_error = exctype is not None and issubclass(exctype, OperationalError)
+        if caught_error:
+            logger = logging.getLogger()
+            logger.warning(f"Table {self.table!r} not found.  Skipping.")

Review comment:
       ```suggestion
               logger.warning(f"Table %r not found.  Skipping.", self.table)
   ```
   
   Don’t use f-string in logging in general.

##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(recency_column))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def _build_query(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    session,
+    **kwargs,
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = _subquery_keep_last(
+            recency_column=recency_column,
+            keep_last_filters=keep_last_filters,
+            keep_last_group_by=keep_last_group_by,
+            session=session,
+        )
+        print(subquery.all())

Review comment:
       Stray debugging code?

##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :type: orm_model: Base
+    :param recency_column: date column to filter by
+    :type: recency_column: Union["Column", "InstrumentedAttribute"]
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :type: keep_last: bool
+    :param keep_last_filters:
+    :type: keep_last_filters: Optional[Any]
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :type: keep_last_group_by: Optional[Any]
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    :type warn_if_missing: bool
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(recency_column))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def _build_query(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    session,
+    **kwargs,
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = _subquery_keep_last(
+            recency_column=recency_column,
+            keep_last_filters=keep_last_filters,
+            keep_last_group_by=keep_last_group_by,
+            session=session,
+        )
+        print(subquery.all())
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger(__file__)
+
+
+def _cleanup_table(
+    *,
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    dry_run=True,
+    verbose=False,
+    session=None,
+    **kwargs,
+):
+    print()
+    if dry_run:
+        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+    query = _build_query(
+        orm_model=orm_model,
+        recency_column=recency_column,
+        keep_last=keep_last,
+        keep_last_filters=keep_last_filters,
+        keep_last_group_by=keep_last_group_by,
+        clean_before_timestamp=clean_before_timestamp,
+        session=session,
+    )
+
+    _print_entities(query=query, print_rows=False)
+
+    if not dry_run:
+        _do_delete(query=query, session=session)
+        session.commit()
+
+
+def _confirm_delete(*, date: DateTime, tables: List[str]):
+    for_tables = f" for tables {tables!r}" if tables else ''
+    question = '\n'.join(
+        [
+            f"You have requested that we purge all data prior to {date}{for_tables}.",
+            "This is irreversible.  Consider backing up the tables first and / or doing a dry run"
+            " with option --dry-run.",
+            "Enter 'delete rows' to proceed.",
+        ]
+    )

Review comment:
       ```suggestion
       question = (
           f"You have requested that we purge all data prior to {date}{for_tables}.\n"
           f"This is irreversible.  Consider backing up the tables first and / or doing a dry run "
           f"with option --dry-run.\n"
           f"Enter 'delete rows' (without quotes) to proceed."
       )
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811538422



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print(f"DB: {settings.engine.url!r}")
+    db.upgradedb()

Review comment:
       This seems to be some mistake. The function name is `clean` but docstring and code is about upgrading DB,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1048341853


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1048341688


   Lgtm, some user-facing docs for this would be nice


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1012518905


   > We **just** had discussion that we need it yesterday with @mik-laj and @mhenc :) 
   
   We have got more stuffed planned to make Airflow Upgrades more reliable - stay tuned, this is just the start :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1012518905


   > We **just** had discussion that we need it yesterday with @mik-laj and @mhenc :) 
   
   We have got more stuffed planned to make Airflow Upgrades more reliable - stay tuned
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1011938595


   We **just** had discussion that we need it yesterday with @mik-laj and @mhenc :) . Great minds think alike :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1024690713


   renamed `airflow db clean` 
   
   welcome opinions on the naming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1040527175


   > Static checks :) ?
   
   sorry should be fixed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r812430460



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}

Review comment:
       maybe "need" is a bit strong.  But, i think the intention is, that for user experience, it's best when the table outputs are sorted.  So e.g. when you are reviewing a list of tables, if perhaps you're scanning for one, if it's sorted, it will be easier to find.  And, since we only add the celery models when they are there to add, we can't just build the list already  completely sorted.  Or we could i guess but the code would be slightly messier.  WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish merged pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish merged pull request #20838:
URL: https://github.com/apache/airflow/pull/20838


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783602610



##########
File path: airflow/cli/cli_parser.py
##########
@@ -378,6 +383,27 @@ def _check(value):
 ARG_CONF = Arg(('-c', '--conf'), help="JSON string that gets pickled into the DagRun's conf attribute")
 ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
 
+# maintenance
+ARG_MAINTENANCE_TABLES = Arg(
+    ("-t", "--tables"),
+    help="Table names to perform maintenance on (use comma-separated list)",

Review comment:
       good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784181709



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {

Review comment:
       yeaup, totally agree, on the list!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r808251808



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))
+    for entry in query.limit(max_rows_to_print):
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41

Review comment:
       and keep the URL too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r808476456



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))
+    for entry in query.limit(max_rows_to_print):
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41

Review comment:
       no strong opinion on it -- either is fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783523446



##########
File path: airflow/cli/cli_parser.py
##########
@@ -1054,6 +1080,14 @@ class GroupCommand(NamedTuple):
         args=(ARG_CLEAR_ONLY,),
     ),
 )
+MAINTENANCE_COMMANDS = (
+    ActionCommand(
+        name='cleanup',

Review comment:
       ```suggestion
           name='cleanup-tables',
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784333054



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       updated the dates to use start date where possible.
   
   though this introduces an edge case where start date may be null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784333507



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       also, added logic to print the "config" in dry run i.e. which columns will be used for which tables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784242426



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,
+    },
+    'task_reschedule': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskReschedule,
+        'recency_column': TaskReschedule.execution_date,
+    },
+    'xcom': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': XCom,
+        'recency_column': XCom.execution_date,
+    },
+}
+
+
+airflow_executor = str(conf.get("core", "executor"))
+if airflow_executor == "CeleryExecutor":
+    from celery.backends.database.models import Task, TaskSet
+
+    print("Including Celery Modules")
+    try:
+        objects.update(
+            **{
+                'task': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': Task,
+                    'recency_column': Task.date_done,
+                },
+                'task_set': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': TaskSet,
+                    'recency_column': TaskSet.date_done,
+                },
+            }
+        )
+    except Exception as e:
+        logging.error(e)
+
+
+session = settings.Session()
+
+
+def print_entities(query: Query, print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def do_delete(query):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def subquery_keep_last(keep_last_filters, keep_last_group_by):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(DagRun.execution_date))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def build_query(
+    orm_model, recency_column, keep_last, keep_last_filters, keep_last_group_by, clean_before_timestamp
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = subquery_keep_last(
+            keep_last_filters=keep_last_filters, keep_last_group_by=keep_last_group_by
+        )
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger(__file__)
+
+
+def _cleanup_table(
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    dry_run=True,
+    verbose=False,
+):
+    print()
+    if dry_run:
+        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+    query = build_query(
+        orm_model=orm_model,
+        recency_column=recency_column,
+        keep_last=keep_last,
+        keep_last_filters=keep_last_filters,
+        keep_last_group_by=keep_last_group_by,
+        clean_before_timestamp=clean_before_timestamp,
+    )
+
+    print_entities(query, print_rows=False)
+
+    if not dry_run:
+        do_delete(query)
+
+
+def confirm(date, tables):
+    for_tables = f" for tables {tables!r}" if tables else ''
+    question = '\n'.join(
+        [
+            f"You have requested that we purge all data prior to {date}{for_tables}.",
+            "This is irreversible.  Consider backing up the tables first and / or doing a dry run with option --dry-run.",
+            "Enter 'delete rows' to proceed.",
+        ]
+    )
+    print(question)
+    answer = input().strip()
+    if not answer == 'delete rows':
+        SystemExit("User did not confirm; exiting.")
+
+
+def run_cleanup(clean_before_timestamp: DateTime, table_names=None, dry_run=False, verbose=False):
+    clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
+    if not dry_run:
+        confirm(clean_before_timestamp, table_names)

Review comment:
       yup it's a good idea i already added a `confirm` flag so that we can use from python (without user input ) but skipping from cli also  good option thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r802163928



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),

Review comment:
       If we delete rows from `DagRun` and `TaskInstance` which have `start_date < clean_before_timestamp` then it will likely create dangling TaskInstances and TaskReschedules, which have caused issues in the past (for example: https://github.com/apache/airflow/issues/18894)
   
   For a give TaskInstance and its parent Dagrun, The `dagrun.start_date` will always be earlier than the `task_instance.start_date`, so if the clean_before_timestamp falls between the two, the DagRun will be deleted while the taskInstance will stay around. 
   
   Do you think that this is safe, or should we do something afterwards to clean up the Dangling TIs?
   For example: https://github.com/apache/airflow/pull/18953




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r802174128



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),

Review comment:
       in the current version of airflow, there are foreign key relationships defined with `on delete cascade` so there should be no dangling objects i think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783736002



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       Shouldn't it be `TI.start_date` here? Otherwise if I run a backfill job of last 2 months, it will remove those records too. Whereas I might be interested in records that were run before a month. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r802163928



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),

Review comment:
       If we delete rows from `DagRun` and `TaskInstance` which have `start_date < clean_before_timestamp` then I think it will likely lead to dangling TaskInstances and TaskReschedules, which have caused issues in the past (for example: https://github.com/apache/airflow/issues/18894)
   
   For a give TaskInstance and its parent Dagrun, The `dagrun.start_date` will always be earlier than the `task_instance.start_date`, so if the clean_before_timestamp falls between the two, the DagRun will be deleted while the taskInstance will stay around. 
   
   Do you think that this is safe, or should we do something afterwards to clean up the Dangling TIs?
   For example: https://github.com/apache/airflow/pull/18953




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807326376



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -94,3 +95,26 @@ def shell(args):
 def check(_):
     """Runs a check command that checks if db is available."""
     db.check()
+
+
+@cli_utils.action_cli(check_db=False)
+def clean(args):
+    """Upgrades the metadata database"""
+    print(f"DB: {settings.engine.url!r}")
+    db.upgradedb()
+
+
+# lazily imported by CLI parser for `help` command
+all_tables = sorted(config_dict)
+
+
+@cli_utils.action_cli(check_db=False)
+def cleanup_tables(args):
+    """Purges old records in metastore database"""

Review comment:
       nit: There are different versions of same name "metadata database" (L102), "metastore database" (L113) and "metastore tables" (L1324 of `airflow/cli/cli_parser.py`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807331051



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))
+    for entry in query.limit(max_rows_to_print):
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41

Review comment:
       Can we add summary of that issue here, as this URL can become 404 !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783602548



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       tell me about the unsureness?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783519744



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob

Review comment:
       It is not needed, because we don't maintain compatibility with Airflow 1.10.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r802198253



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),

Review comment:
       Ah, thats the part I missed. The dangling rows can only exist when dropping data from Pre-2.2 Airflow. 
   
   Thanks for confirming!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783671395



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       This should work, there are existing code doing it. The association proxy does not always work like real columns, but does work for this specific usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783727093



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {

Review comment:
       How do dataclass sounds instead of namedtuples @uranusjr ? Now that we are python 3.7+ in main -- we could use dataclass freely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil removed a comment on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil removed a comment on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1012518905


   > We **just** had discussion that we need it yesterday with @mik-laj and @mhenc :) 
   
   We have got more stuffed planned to make Airflow Upgrades more reliable - stay tuned, this is just the start :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783528077



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,

Review comment:
       I'm not sure about it. It is a association proxy




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r802198253



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),

Review comment:
       Ah, thats the part I missed. The dangling rows can only exist when dropping data from Pre-2.2 Airflow. 
   
   Thanks for the clarification!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r808251808



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+
+
+def _print_entities(*, query: "Query", print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    logger.debug("print entities query: " + str(query))
+    for entry in query.limit(max_rows_to_print):
+        print(entry.__dict__)
+
+
+def _do_delete(*, query, session):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41

Review comment:
       and keep the URL too?  or remove




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r807327758



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters:

Review comment:
       missing description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1040511386


   Static checks :) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `db clean` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r811020151



##########
File path: airflow/utils/db_cleanup.py
##########
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+from contextlib import AbstractContextManager
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from pendulum import DateTime
+from sqlalchemy import and_, false, func
+from sqlalchemy.exc import OperationalError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.jobs.base_job import BaseJob
+from airflow.models import (
+    Base,
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    TaskReschedule,
+    XCom,
+)
+from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query, Session
+    from sqlalchemy.orm.attributes import InstrumentedAttribute
+    from sqlalchemy.sql.schema import Column
+
+
+@dataclass
+class _TableConfig:
+    """
+    Config class for performing cleanup on a table
+
+    :param orm_model: the table
+    :param recency_column: date column to filter by
+    :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+    :param keep_last_filters: the "keep last" functionality will preserve the most recent record
+        in the table.  to ignore certain records even if they are the latest in the table, you can
+        supply additional filters here (e.g. externally triggered dag runs)
+    :param keep_last_group_by: if keeping the last record, can keep the last record for each group
+    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
+        If False then the exception will go uncaught.
+    """
+
+    orm_model: Base
+    recency_column: Union["Column", "InstrumentedAttribute"]
+    keep_last: bool = False
+    keep_last_filters: Optional[Any] = None
+    keep_last_group_by: Optional[Any] = None
+    warn_if_missing: bool = False
+
+    def __lt__(self, other):
+        return self.orm_model.__tablename__ < self.orm_model.__tablename__
+
+    @property
+    def readable_config(self):
+        return dict(
+            table=self.orm_model.__tablename__,
+            recency_column=str(self.recency_column),
+            keep_last=self.keep_last,
+            keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
+            keep_last_group_by=str(self.keep_last_group_by),
+            warn_if_missing=str(self.warn_if_missing),
+        )
+
+
+config_list: List[_TableConfig] = [
+    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
+    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(
+        orm_model=DagRun,
+        recency_column=DagRun.start_date,
+        keep_last=True,
+        keep_last_filters=[DagRun.external_trigger == false()],
+        keep_last_group_by=DagRun.dag_id,
+    ),
+    _TableConfig(orm_model=ImportError, recency_column=ImportError.timestamp),
+    _TableConfig(orm_model=Log, recency_column=Log.dttm),
+    _TableConfig(
+        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+    ),
+    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
+    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
+    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
+    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
+    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
+]
+try:
+    from celery.backends.database.models import Task, TaskSet
+
+    config_list.extend(
+        [
+            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
+            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
+        ]
+    )
+except ImportError:
+    pass
+
+config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}

Review comment:
       Any particular reason we need to sort this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r784208441



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,
+    },
+    'task_reschedule': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskReschedule,
+        'recency_column': TaskReschedule.execution_date,
+    },
+    'xcom': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': XCom,
+        'recency_column': XCom.execution_date,
+    },
+}
+
+
+airflow_executor = str(conf.get("core", "executor"))
+if airflow_executor == "CeleryExecutor":
+    from celery.backends.database.models import Task, TaskSet
+
+    print("Including Celery Modules")
+    try:
+        objects.update(
+            **{
+                'task': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': Task,
+                    'recency_column': Task.date_done,
+                },
+                'task_set': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': TaskSet,
+                    'recency_column': TaskSet.date_done,
+                },
+            }
+        )
+    except Exception as e:
+        logging.error(e)
+
+
+session = settings.Session()
+
+
+def print_entities(query: Query, print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def do_delete(query):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def subquery_keep_last(keep_last_filters, keep_last_group_by):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(DagRun.execution_date))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def build_query(
+    orm_model, recency_column, keep_last, keep_last_filters, keep_last_group_by, clean_before_timestamp
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = subquery_keep_last(
+            keep_last_filters=keep_last_filters, keep_last_group_by=keep_last_group_by
+        )
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger(__file__)
+
+
+def _cleanup_table(
+    orm_model,
+    recency_column,
+    keep_last,
+    keep_last_filters,
+    keep_last_group_by,
+    clean_before_timestamp,
+    dry_run=True,
+    verbose=False,
+):
+    print()
+    if dry_run:
+        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+    query = build_query(
+        orm_model=orm_model,
+        recency_column=recency_column,
+        keep_last=keep_last,
+        keep_last_filters=keep_last_filters,
+        keep_last_group_by=keep_last_group_by,
+        clean_before_timestamp=clean_before_timestamp,
+    )
+
+    print_entities(query, print_rows=False)
+
+    if not dry_run:
+        do_delete(query)
+
+
+def confirm(date, tables):
+    for_tables = f" for tables {tables!r}" if tables else ''
+    question = '\n'.join(
+        [
+            f"You have requested that we purge all data prior to {date}{for_tables}.",
+            "This is irreversible.  Consider backing up the tables first and / or doing a dry run with option --dry-run.",
+            "Enter 'delete rows' to proceed.",
+        ]
+    )
+    print(question)
+    answer = input().strip()
+    if not answer == 'delete rows':
+        SystemExit("User did not confirm; exiting.")
+
+
+def run_cleanup(clean_before_timestamp: DateTime, table_names=None, dry_run=False, verbose=False):
+    clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
+    if not dry_run:
+        confirm(clean_before_timestamp, table_names)

Review comment:
       Thoughts on adding a way to bypass this?
   
    For example, if we wanted to have a DAG or CronJob which ran a cleanup command daily? I know lots of other CLIs have a `--yes` flag to skip the confirmation step.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783528233



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {
+    'job': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': BaseJob,
+        'recency_column': BaseJob.latest_heartbeat,
+    },
+    'dag': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': DagModel,
+        'recency_column': DagModel.last_parsed_time,
+    },
+    'dag_run': {
+        'keep_last': True,
+        "keep_last_filters": [DagRun.external_trigger.is_(False)],
+        "keep_last_group_by": DagRun.dag_id,
+        'orm_model': DagRun,
+        'recency_column': DagRun.execution_date,
+    },
+    'import_error': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': ImportError,
+        'recency_column': ImportError.timestamp,
+    },
+    'log': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': Log,
+        'recency_column': Log.dttm,
+    },
+    'rendered_task_instance_fields': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': RenderedTaskInstanceFields,
+        'recency_column': RenderedTaskInstanceFields.execution_date,
+    },
+    'sla_miss': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': SlaMiss,
+        'recency_column': SlaMiss.execution_date,
+    },
+    'task_fail': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskFail,
+        'recency_column': TaskFail.execution_date,
+    },
+    'task_instance': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskInstance,
+        'recency_column': TaskInstance.execution_date,
+    },
+    'task_reschedule': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': TaskReschedule,
+        'recency_column': TaskReschedule.execution_date,
+    },
+    'xcom': {
+        'keep_last': False,
+        'keep_last_filters': None,
+        'keep_last_group_by': None,
+        'orm_model': XCom,
+        'recency_column': XCom.execution_date,
+    },
+}
+
+
+airflow_executor = str(conf.get("core", "executor"))
+if airflow_executor == "CeleryExecutor":
+    from celery.backends.database.models import Task, TaskSet
+
+    print("Including Celery Modules")
+    try:
+        objects.update(
+            **{
+                'task': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': Task,
+                    'recency_column': Task.date_done,
+                },
+                'task_set': {
+                    'keep_last': False,
+                    'keep_last_filters': None,
+                    'keep_last_group_by': None,
+                    'orm_model': TaskSet,
+                    'recency_column': TaskSet.date_done,
+                },
+            }
+        )
+    except Exception as e:
+        logging.error(e)
+
+
+session = settings.Session()
+
+
+def print_entities(query: Query, print_rows=False):
+    num_entities = query.count()
+    print(f"Found {num_entities} rows meeting deletion criteria.")
+    if not print_rows:
+        return
+    max_rows_to_print = 100
+    if num_entities > 0:
+        print(f"Printing first {max_rows_to_print} rows.")
+    entries_to_delete = query.limit(max_rows_to_print).all()
+    logging.debug("print entities query: " + str(query))
+    for entry in entries_to_delete:  # type: Log
+        print(entry.__dict__)
+
+
+def do_delete(query):
+    print("Performing Delete...")
+    # using bulk delete
+    query.delete(synchronize_session=False)
+    session.commit()
+    print("Finished Performing Delete")
+
+
+def subquery_keep_last(keep_last_filters, keep_last_group_by):
+    # workaround for MySQL "table specified twice" issue
+    # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
+    subquery = session.query(func.max(DagRun.execution_date))
+
+    if keep_last_filters is not None:
+        for entry in keep_last_filters:
+            subquery = subquery.filter(entry)
+
+    if keep_last_group_by is not None:
+        subquery = subquery.group_by(keep_last_group_by)
+
+    subquery = subquery.from_self()
+    return subquery
+
+
+def build_query(
+    orm_model, recency_column, keep_last, keep_last_filters, keep_last_group_by, clean_before_timestamp
+):
+    query = session.query(orm_model)
+    conditions = [recency_column < clean_before_timestamp]
+    if keep_last:
+        subquery = subquery_keep_last(
+            keep_last_filters=keep_last_filters, keep_last_group_by=keep_last_group_by
+        )
+        conditions.append(recency_column.notin_(subquery))
+    query = query.filter(and_(*conditions))
+    return query
+
+
+logger = logging.getLogger()

Review comment:
       ```suggestion
   logger = logging.getLogger(__file__)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783602645



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob

Review comment:
       thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#discussion_r783730384



##########
File path: airflow/utils/metastore_cleanup.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+orm_model: the table
+recency_column: date column to filter by
+keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
+keep_last_filters:
+keep_last_group_by: if keeping the last record, can keep the last record for each group
+"""
+
+
+from pendulum import DateTime
+from sqlalchemy.orm import Query
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.models import (
+    DagModel,
+    DagRun,
+    ImportError,
+    Log,
+    RenderedTaskInstanceFields,
+    SlaMiss,
+    TaskFail,
+    TaskInstance,
+    XCom,
+)
+
+try:
+    from airflow.jobs import BaseJob
+except Exception as e:
+    from airflow.jobs.base_job import BaseJob
+
+import logging
+
+from sqlalchemy import and_, func
+
+from airflow.models import TaskReschedule
+from airflow.utils import timezone
+
+now = timezone.utcnow
+
+
+objects = {

Review comment:
       Dataclasses are a good choice as well!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #20838: Add `maintenance cleanup` CLI command for purging old data

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #20838:
URL: https://github.com/apache/airflow/pull/20838#issuecomment-1022847551


   Okie doke I think this is ready for a look.  Had to write a lot of tests. Think I have decent coverage.  
   
   I have toyed with the idea of moving this to the `airflow db` subcommand for now since it's very much db-specific and we don't have any other maintenance commands at the moment.  I we add a lot of maintenancey stuff later we can always move the commands.  I welcome opinions on this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org