You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2023/08/22 21:35:31 UTC

[incubator-sdap-nexus] branch master updated: SDAP-457 - Script to purge DOMS execution data (#241)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 76f3e4d  SDAP-457 - Script to purge DOMS execution data (#241)
76f3e4d is described below

commit 76f3e4d617abbf283804d6f52aa2eff86e15a744
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Tue Aug 22 14:35:26 2023 -0700

    SDAP-457 - Script to purge DOMS execution data (#241)
    
    * SDAP-457: Script to drop old & failed executions & associated data
    
    * Script improvements
    
    * Changelog
    
    * ASF header
    
    * Log to stdout
    
    * Added readme & requirements.txt
    
    * Added non-interactive option
    
    * Dockerfile and associated files + changes
    
    * Helm template and values to deploy script as CronJob
    
    * Mentioned helm template in changelog
    
    * Corrected incorrect name in domspurge.yml
    
    * Script exits nonzero on error
    
    * Improved handling of time
    
    Ensured computation of cutoff time with --before-months is in the correct timezone
    Added note for formatting & tz of --before arg
    
    * Changed imagepullpolicy to always
    
    * Changed handling of null executions
    
    We should hold onto those based on their start time because a) the execution may be in progress as the script runs and b) with LJ changes on the way we should hold onto failed & cancelled executions for a while too for the `/job` status endpoint
    
    * Fix missed change from copy/paste
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 CHANGELOG.md                                       |   2 +
 .../algorithms/doms/DomsInitialization.py          |   3 +-
 helm/templates/domspurge.yml                       |  35 ++
 helm/values.yaml                                   |  10 +
 tools/domspurge/Dockerfile                         |  31 ++
 tools/domspurge/README.md                          |  37 +++
 tools/domspurge/entrypoint.sh                      |  30 ++
 tools/domspurge/purge.py                           | 368 +++++++++++++++++++++
 tools/domspurge/requirements.txt                   |   4 +
 9 files changed, 518 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index cbcc2aa..4c5cb88 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -36,6 +36,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm
 - SDAP-476: Support for service accounts for handling AWS credentials
 - SDAP-459: Added explicit definitions of min/max lat/lon values in nexustiles Solr collection creation script
+- SDAP-457: Added tool script to purge DOMS execution data. Can remove executions before a certain datetime, before a number of months in the future, uncompleted executions, or purge all execution data.
+  - Also added Helm template and values to deploy this script as a `CronJob` in k8s.
 ### Changed
 - SDAP-443:
   - Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index ed7db20..74a2aa6 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -30,10 +30,9 @@ from webservice.NexusHandler import nexus_initializer
 @nexus_initializer
 class DomsInitializer:
     def __init__(self):
-        pass
+        self.log = logging.getLogger(__name__)
 
     def init(self, config):
-        self.log = logging.getLogger(__name__)
         self.log.info("*** STARTING DOMS INITIALIZATION ***")
 
         domsconfig = configparser.SafeConfigParser()
diff --git a/helm/templates/domspurge.yml b/helm/templates/domspurge.yml
new file mode 100644
index 0000000..5c9f827
--- /dev/null
+++ b/helm/templates/domspurge.yml
@@ -0,0 +1,35 @@
+{{ if .Values.cronJobs.domsPurge.enabled }}
+
+apiVersion: batch/v1
+kind: CronJob
+metadata:
+  name: doms-purge
+spec:
+  schedule: {{ .Values.cronJobs.domsPurge.schedule }}
+  concurrencyPolicy: {{ .Values.cronJobs.domsPurge.concurrency }}
+  jobTemplate:
+    spec:
+      template:
+        spec:
+          containers:
+          - name: doms-purge
+            image: {{ .Values.cronJobs.domsPurge.image }}
+            imagePullPolicy: Always
+            env:
+              - name: CASSANDRA_CONTACT_POINTS
+                value: {{ include "nexus.urls.cassandra" . }}
+              - name: CASSANDRA_USERNAME
+                value: {{ include "nexus.credentials.cassandra.username" . }}
+              - name: CASSANDRA_PASSWORD
+                value: {{ include "nexus.credentials.cassandra.password" . }}
+              - name: "YES"
+                value: "true"
+              - name: BEFORE_MONTHS
+                value: {{ .Values.cronJobs.domsPurge.keepBeforeMonths }}
+              {{ if .Values.cronJobs.domsPurge.keepUncompleted }}
+              - name: KEEP_FAILED
+                value: {{ .Values.cronJobs.domsPurge.keepUncompleted }}
+              {{ end }}
+          restartPolicy: {{ .Values.cronJobs.domsPurge.restartPolicy }}
+
+{{ end }}
diff --git a/helm/values.yaml b/helm/values.yaml
index 95a9e6f..4105362 100644
--- a/helm/values.yaml
+++ b/helm/values.yaml
@@ -207,3 +207,13 @@ cassandra:
     limits:
       cpu: 1
       memory: 8Gi
+
+cronJobs:
+  domsPurge:
+    enabled: true
+    schedule: "0 0 * * *"
+    keepBeforeMonths: "2"
+    keepUncompleted: false
+    image: "nexusjpl/domspurge:1.0.0"
+    restartPolicy: Never
+    concurrency: Forbid
diff --git a/tools/domspurge/Dockerfile b/tools/domspurge/Dockerfile
new file mode 100644
index 0000000..78dc3cd
--- /dev/null
+++ b/tools/domspurge/Dockerfile
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.8.13
+
+COPY tools/domspurge/purge.py /domspurge/purge.py
+COPY tools/domspurge/requirements.txt /domspurge/requirements.txt
+COPY tools/domspurge/entrypoint.sh /domspurge/entrypoint.sh
+COPY analysis/webservice/algorithms/doms/DomsInitialization.py /domspurge/DomsInitialization.py
+
+# We just need the keyspace & table initialization. Remove references to other SDAP modules.
+# Doing this because if the schemas change, we want it to be reflected in this utility
+RUN sed '/nexus_initializer/d' -i /domspurge/DomsInitialization.py
+
+WORKDIR /domspurge
+
+RUN pip3 install -r requirements.txt
+
+ENTRYPOINT ["/bin/bash", "/domspurge/entrypoint.sh"]
diff --git a/tools/domspurge/README.md b/tools/domspurge/README.md
new file mode 100644
index 0000000..92f7cfb
--- /dev/null
+++ b/tools/domspurge/README.md
@@ -0,0 +1,37 @@
+# Purge DOMS/CDMS Execution Data
+
+## Prerequisites
+
+_If you are running the `purge.py` script from within the nexus-webapp-driver Docker image, the following prerequisites are not necessary._
+
+* Run `python setup.py install` in `incubator-sdap-nexus/analysis` 
+* Run `pip install -r requirements.txt` in `incubator-sdap-nexus/tools/domspurge`
+
+## Usage
+`python purge.py -u USERNAME -p PASSWORD [CASSANDRA ARGS...] (--before DATETIME | --before-months MONTHS | --keep-completed | --all) [--keep-failed] [--dry-run] [-y]`
+
+### Options
+- `-u & -p`: Cassandra username and password
+- One of:
+  - `--before`: Datetime (ie, '2023-04-11T19:50:00') before which all data will be purged. Should be entered in UTC and the timezone must not be specified.
+  - `--before-months`: Number of months prior to current date before which all data will be purged. 
+  - `--keep-completed`: Keep all completed executions. (only purge uncompleted executions)
+  - `--all`: Purge ALL data. (drops and re-creates keyspace)
+- Cassandra args (optional)
+  - `--cassandra`: Cassandra hostname(s) or IP(s) Can provide a list of args or provide them separated by commas (eg: `host1 host2 host3` or `host1,host2,host3`). (Default: localhost)
+  - `-k / --cassandraKeyspace`: Cassandra keyspace for storing DOMS data. (Default: doms)
+  - `--cassandraPort`: Port used to connect to Cassandra. (Default: 9042)
+  - `--cassandraProtocolVersion`: The version of the Cassandra protocol the driver should use. (Default: 3)
+- Additional args
+  - `--keep-failed`: Do not purge uncompleted executions (by default all are purged). Incompatible with `--keep-completed`
+  - `--dry-run`: Only print the execution ids to be deleted / DB operations to be performed and exit. Do not actually alter the DB
+  - `-y / --yes`: Do not prompt user for confirmation.
+
+## Build Docker Image
+
+You can build an image for this script to run it in a Kubernetes CronJob.
+
+```shell
+cd /incubator-sdap-nexus
+docker build . -f Dockerfile -t sdap-local/DomsPurge:<tag>
+```
diff --git a/tools/domspurge/entrypoint.sh b/tools/domspurge/entrypoint.sh
new file mode 100644
index 0000000..f5fe3b3
--- /dev/null
+++ b/tools/domspurge/entrypoint.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+python /domspurge/purge.py \
+  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra=$CASSANDRA_CONTACT_POINTS) \
+  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandraPort=$CASSANDRA_PORT) \
+  $([[ ! -z "$CASSANDRA_KEYSPACE" ]] && echo --cassandraKeyspace=$CASSANDRA_KEYSPACE) \
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$CASSANDRA_PROTOCOL" ]] && echo --cassandraProtocolVersion=$CASSANDRA_PROTOCOL) \
+  $([[ ! -z "$YES" ]] && echo -y) \
+  $([[ ! -z "$DRY_RUN" ]] && echo --dry-run) \
+  $([[ ! -z "$BEFORE" ]] && echo --before=$BEFORE) \
+  $([[ ! -z "$BEFORE_MONTHS" ]] && echo --before-months=$BEFORE_MONTHS) \
+  $([[ ! -z "$KEEP_COMPLETED" ]] && echo --keep-completed) \
+  $([[ ! -z "$KEEP_FAILED" ]] && echo --keep-failed) \
+  $([[ ! -z "$PURGE_ALL" ]] && echo --all)
diff --git a/tools/domspurge/purge.py b/tools/domspurge/purge.py
new file mode 100644
index 0000000..4fb2fc3
--- /dev/null
+++ b/tools/domspurge/purge.py
@@ -0,0 +1,368 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import argparse
+import json
+import logging
+import sys
+from datetime import datetime
+from typing import Tuple, List
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
+from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
+from dateutil import parser as du_parser
+from dateutil.relativedelta import relativedelta
+from six.moves import input
+from tqdm import tqdm
+
+try:
+    logging.getLogger('webservice.NexusHandler').setLevel(logging.CRITICAL)
+    from webservice.algorithms.doms.DomsInitialization import DomsInitializer
+except ImportError:
+    from DomsInitialization import DomsInitializer
+
+logging.basicConfig(level=logging.INFO, stream=sys.stdout)
+log = logging.getLogger(__name__)
+dry_run = False
+non_interactive = False
+
+
+def get_confirmation(prompt_string='Continue? [y]/n: '):
+    if non_interactive:
+        return True
+
+    do_continue = input(prompt_string)
+
+    while do_continue not in ['y', 'n', '']:
+        do_continue = input(prompt_string)
+
+    return do_continue != 'n'
+
+
+def main(args, before, keep_completed, keep_failed, purge_all, recreate):
+    log.info('Connecting to Cassandra cluster')
+
+    dc_policy = RoundRobinPolicy()
+    token_policy = TokenAwarePolicy(dc_policy)
+
+    if args.username and args.password:
+        auth_provider = PlainTextAuthProvider(username=args.username, password=args.password)
+    else:
+        auth_provider = None
+
+    contact_points = []
+
+    for host_list in args.hosts:
+        contact_points.extend(host_list.split(','))
+
+    try:
+        with Cluster(contact_points,
+                     port=int(args.port),
+                     execution_profiles={
+                         EXEC_PROFILE_DEFAULT: ExecutionProfile(
+                             load_balancing_policy=token_policy,
+                             request_timeout=60.0,
+                         )
+                     },
+                     protocol_version=int(args.pv),
+                     auth_provider=auth_provider) as cluster:
+            session = cluster.connect(args.keyspace) if not recreate else cluster.connect()
+
+            log.info('Connected successfully to Cassandra')
+
+            if recreate:
+                log.info('Recreating doms keyspace')
+                create_keyspace(session, args.keyspace)
+                exit(0)
+
+            log.info('Determining the number of executions that will be dropped')
+
+            execution_count, ids = count_executions(session, before, keep_completed, keep_failed, purge_all)
+
+            if dry_run:
+                if execution_count == 0:
+                    log.info('No executions will be deleted with the provided criteria')
+                elif purge_all:
+                    log.info(f'The \'{args.keyspace}\' keyspace will be dropped then recreated w/ all needed tables')
+                else:
+                    log.info(f'The following executions would be deleted: \n'
+                             f'{json.dumps([str(rid) for rid in ids], indent=4)} \n'
+                             f'Total: {len(ids):,}')
+
+                exit(0)
+
+            if execution_count == 0 and not purge_all:
+                log.info('No executions will be deleted with the provided criteria')
+                exit(0)
+            elif execution_count == 0 and purge_all:
+                if not get_confirmation('No executions will be deleted with the provided criteria. Do you still wish '
+                                        f'to drop & recreate the \'{args.keyspace}\' keyspace? [y]/n: '):
+                    exit(0)
+            else:
+                if not get_confirmation(f'{execution_count:,} executions selected for deletion. Continue? [y]/n: '):
+                    exit(0)
+
+            if purge_all:
+                purge_all_data(session, args.keyspace)
+            else:
+                for row_id in tqdm(ids, ascii=True, desc='Executions deleted', ncols=80, unit='Execution'):
+                    delete_execution(session, row_id)
+
+                log.info(f'Successfully deleted the following executions: \n'
+                         f'{json.dumps([str(rid) for rid in ids], indent=4)} \n'
+                         f'Total: {len(ids):,}')
+    except NoHostAvailable as ne:
+        log.exception(ne)
+        exit(1)
+
+
+def delete_execution(session, row_id):
+    cql_data = session.prepare("""
+    DELETE FROM doms_data WHERE execution_id=?;
+    """)
+    cql_execution_stats = session.prepare("""
+    DELETE FROM doms_execution_stats WHERE execution_id=?;
+    """)
+    cql_params = session.prepare("""
+    DELETE FROM doms_params WHERE execution_id=?;
+    """)
+    cql_executions = session.prepare("""
+    DELETE FROM doms_executions WHERE id=?;
+    """)
+
+    session.execute(cql_data, (row_id,))
+    session.execute(cql_execution_stats, (row_id,))
+    session.execute(cql_params, (row_id,))
+    session.execute(cql_executions, (row_id,))
+
+
+def create_keyspace(session, keyspace):
+    log.info('*** RUNNING DOMS INITIALIZATION ***')
+
+    initializer = DomsInitializer()
+    initializer.createKeyspace(session, keyspace)
+    initializer.createTables(session)
+
+
+def purge_all_data(session, keyspace):
+    if not get_confirmation(f'You have selected to purge all data. This will drop and recreate the \'{keyspace}\' '
+                            'keyspace. Continue? [y]/n: '):
+        exit(0)
+
+    cql = f"""
+    drop keyspace {keyspace};
+    """
+
+    log.info('Executing keyspace drop')
+    log.info('NOTE: If something goes wrong with keyspace recreation, rerun this utility with just \'--recreate-ks\''
+             ' and the cassandra auth options')
+    session.execute(cql, timeout=None)
+    log.info(f'\'{keyspace}\' keyspace dropped. Recreating it now.')
+    create_keyspace(session, keyspace)
+
+
+def count_executions(session, before, keep_completed, keep_failed, purge_all) -> Tuple[int, List]:
+    if purge_all:
+        cql = """
+        SELECT COUNT (id) FROM doms_executions;
+        """
+
+        return session.execute(cql).one().system_count_id, []
+    elif before and not keep_failed:    # Drop nulls & all before
+        # Cassandra doesn't allow for selecting null values, so we have to check them all manually
+
+        log.info(f'Counting executions before {before} including uncompleted executions')
+
+        cql = """
+        SELECT * FROM doms_executions;
+        """
+
+        to_delete = []
+
+        for row in session.execute(cql):
+            if (row.time_completed is None and row.time_started <= before) or \
+               (row.time_completed is not None and row.time_completed <= before):
+                to_delete.append(row.id)
+
+        return len(to_delete), to_delete
+    elif before and keep_failed:    # Drop all before but not nulls
+        log.info(f'Counting executions before {before} excluding uncompleted executions')
+
+        cql = """
+                SELECT id FROM doms_executions WHERE time_completed<=? ALLOW FILTERING ;
+                """
+
+        query = session.prepare(cql)
+        to_delete = []
+
+        for row in session.execute(query, (before,)):
+            to_delete.append(row.id)
+
+        return len(to_delete), to_delete
+    elif keep_completed:   # Only drop nulls
+        # Cassandra doesn't allow for selecting null values, so we have to check them all manually
+
+        log.info(f'Counting ALL uncompleted executions')
+
+        cql = """
+                SELECT * FROM doms_executions;
+                """
+
+        to_delete = []
+
+        for row in session.execute(cql):
+            if row.time_completed is None:
+                to_delete.append(row.id)
+
+        return len(to_delete), to_delete
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='Purge DOMS data from Cassandra')
+
+    cassandra_args = parser.add_argument_group('Cassandra args')
+    purge_options = parser.add_argument_group('Purge options')
+
+    cassandra_args.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).',
+                                required=False,
+                                default=['localhost'],
+                                dest='hosts',
+                                nargs='+',
+                                metavar=('localhost', '127.0.0.101'))
+
+    cassandra_args.add_argument('-k', '--cassandraKeyspace',
+                                help='The Cassandra keyspace for DOMS data.',
+                                default='doms',
+                                dest='keyspace',
+                                required=False,
+                                metavar='DOMS_KEYSPACE')
+
+    cassandra_args.add_argument('--cassandraPort',
+                                help='The port used to connect to Cassandra.',
+                                dest='port',
+                                required=False,
+                                default='9042')
+
+    cassandra_args.add_argument('-u', '--cassandra-username',
+                                dest='username', help='The username used to connect to Cassandra.',
+                                required=True, metavar='USERNAME')
+
+    cassandra_args.add_argument('-p', '--cassandra-password',
+                                dest='password', help='The password used to connect to Cassandra.',
+                                required=True, metavar='PASSWORD')
+
+    cassandra_args.add_argument('--cassandraProtocolVersion',
+                                help='The version of the Cassandra protocol the driver should use.',
+                                required=False,
+                                dest='pv',
+                                choices=['1', '2', '3', '4', '5'],
+                                default='3')
+
+    time_before = purge_options.add_mutually_exclusive_group(required=True)
+
+    time_before.add_argument('--before',
+                             help='Date & time before which data will be purged. Time entered should be UTC. Do not '
+                                  'specify timezone.',
+                             type=du_parser.parse,
+                             dest='before_dt',
+                             metavar='DATETIME',
+                             default=None)
+
+    def num_months(s):
+        v = int(s)
+
+        if v <= 0:
+            raise ValueError('--before-months must be >= 1')
+
+        return v
+
+    time_before.add_argument('--before-months',
+                             help='Drop all data before n months ago',
+                             type=num_months,
+                             dest='before_mo',
+                             metavar='MONTHS',
+                             default=None)
+
+    time_before.add_argument('--keep-completed',
+                             help='Keep all completed executions (only purge failed executions)',
+                             action='store_true',
+                             dest='keep')
+
+    time_before.add_argument('--all',
+                             help='Purge ALL data (drops and re-creates keyspace)',
+                             action='store_true',
+                             dest='all')
+
+    time_before.add_argument('--recreate-ks',
+                             help=argparse.SUPPRESS,
+                             action='store_true',
+                             dest='recreate')
+
+    purge_options.add_argument('--keep-failed',
+                               help='Keep failed executions.',
+                               action='store_true',
+                               dest='keep_failed')
+
+    parser.add_argument('--dry-run',
+                        help='Only print the execution ids to be deleted / DB operations to be performed. Do not '
+                             'actually alter the DB',
+                        action='store_true',
+                        dest='dry_run')
+
+    parser.add_argument('-y', '--yes',
+                        help='Do not ask for confirmation.',
+                        action='store_true',
+                        dest='yes')
+
+    args = parser.parse_args()
+
+    global dry_run
+    global non_interactive
+    dry_run = args.dry_run
+    non_interactive = args.yes
+
+    if args.recreate:
+        return args, None, False, False, False, True
+
+    if args.all:
+        if args.keep_failed:
+            raise ValueError('Mutually exclusive options (purge all & keep) selected')
+
+        return args, None, False, False, True, False
+
+    if args.keep and args.keep_failed:
+        raise ValueError('--keep-completed and --keep-failed are set; this will have no effect')
+
+    if args.keep:
+        before = None
+    elif args.before_dt:
+        before = args.before_dt
+    else:
+        now = datetime.utcnow()
+        delta = relativedelta(months=-args.before_mo)
+        before = now + delta
+
+    return args, before, args.keep, args.keep_failed, False, False
+
+
+if __name__ == '__main__':
+    try:
+        main(*parse_args())
+    except Exception as e:
+        log.error('An unexpected error occurred...')
+        log.exception(e)
+        exit(-1)
diff --git a/tools/domspurge/requirements.txt b/tools/domspurge/requirements.txt
new file mode 100644
index 0000000..4356380
--- /dev/null
+++ b/tools/domspurge/requirements.txt
@@ -0,0 +1,4 @@
+cassandra-driver==3.24.0
+python-dateutil==2.7.5
+six==1.16.0
+tqdm==4.64.1
\ No newline at end of file