You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/01 20:37:13 UTC

(incubator-sdap-in-situ-data-services) branch master updated: Data validation tool lambda (#24)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git


The following commit(s) were added to refs/heads/master by this push:
     new f3da62b  Data validation tool lambda (#24)
f3da62b is described below

commit f3da62b0b9189d57970cc0243c2b0bec1a125870
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Fri Mar 1 12:37:09 2024 -0800

    Data validation tool lambda (#24)
    
    * Temporary solution to http --> https redirect issue
    
    * Add data validation tool
    
    * Update comment
    
    * Add output-file option and better commandline output handling
    
    * Help message for env vars
    
    * Option for formatting output
    
    * Make audit tool importable as a function
    
    * SQS and SNS classes
    
    * Audit tool lambda adaptation
    
    * Audit tool lambda adaptation
    
    * Audit tool (attempt) to reinvoke lambda if it is close to timing out
    
    * Works in Lambda now
    
    * Logging
    
    * Optimization: Select objects for auditing that were added after last execution
    
    * Audit tool readme
    
    * Audit tool readme build instructions
    
    * Initialize last list time in state on local run
    
    ---------
    
    Co-authored-by: Jason Min-Liang Kang <ja...@gmail.com>
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 parquet_cli/audit_tool/README.md                   |  83 +++++
 .../aws => parquet_cli/audit_tool}/__init__.py     |   4 +-
 parquet_cli/audit_tool/audit_tool.py               | 382 +++++++++++++++++++++
 parquet_flask/aws/__init__.py                      |   5 +-
 parquet_flask/aws/{__init__.py => aws_sns.py}      |  20 +-
 parquet_flask/aws/{__init__.py => aws_sqs.py}      |  19 +-
 .../audit_tool}/__init__.py                        |   3 +-
 .../cdms_lambda_func/audit_tool/execute_lambda.py  |  89 +++++
 setup.py                                           |   2 +-
 setup_lambda.py                                    |   2 +-
 10 files changed, 602 insertions(+), 7 deletions(-)

diff --git a/parquet_cli/audit_tool/README.md b/parquet_cli/audit_tool/README.md
new file mode 100644
index 0000000..5d1f3e4
--- /dev/null
+++ b/parquet_cli/audit_tool/README.md
@@ -0,0 +1,83 @@
+# Insitu Stats Audit and Repair
+
+A bug has been observed where the Parquet files produced at ingestion are missed by the stats extraction. This is 
+problematic because, in addition to the stats endpoint being incorrect, the spatiotemporal information in stats records
+is used in subsetting, leading to the appearance of missing data as only the Parquet files with extant stats records are 
+processed at subset time.
+
+Until the cause is identified & fixed, this tool can be used to verify that all the Parquet files in the destination S3
+bucket are present in the stats index. It can either list the missing keys to console with the added option of listing 
+them to a file as well, or it can publish them to AWS SQS as mock-S3 ObjectCreated events to re-trigger the stats 
+extraction Lambda with the missing key. Optionally, it can notify users to missing keys via publishing to an AWS SNS 
+topic.
+
+There are two ways it's designed to be run:
+
+## 1: Locally
+
+1. Clone the repo: `git clone --branch data-validation-tool-lambda https://github.com/RKuttruff/incubator-sdap-in-situ-data-services.git`
+2. `cd incubator-sdap-in-situ-data-services`
+3. Run `python setup_lambda.py install` 
+4. cd `parquet_cli/audit_tool`
+5. Usage: 
+```
+(parquet_flask) rileykk@MT-407178 audit_tool % python audit_tool.py -h
+usage: audit_tool.py [-h] [-o [OUTPUT]] [-f {plain,mock-s3}]
+
+Audit parquet files in S3 against records in OpenSearch
+
+optional arguments:
+  -h, --help            show this help message and exit
+  -o [OUTPUT], --output-file [OUTPUT]
+                        file to output the S3 keys of the files that are not found in OpenSearch
+  -f {plain,mock-s3}, --format {plain,mock-s3}
+                        Output format. 'plain' will output keys of missing parquet files to the output file in plain text. 'mock-s3' will output missing keys to SQS (-o is required as the SQS queue URL), formatted as an S3 object created event
+
+Environment variables:  (describe what they are for & provide examples where appropriate
+    AWS_ACCESS_KEY_ID : AWS access key ID for S3 bucket & OpenSearch index
+    AWS_SECRET_ACCESS_KEY : AWS secret access key for S3 bucket & OpenSearch index
+    AWS_REGION : AWS region for S3 bucket & OpenSearch index
+    OPENSEARCH_ENDPOINT : Endpoint for OpenSearch domain
+    OPENSEARCH_PORT : Port to connect to OpenSearch (Default: 443)
+    OPENSEARCH_BUCKET : Name of the bucket storing ingested Parquet files.
+    OPENSEARCH_PATH_PREFIX : Key prefix for objects in OPENSEARCH_BUCKET to audit.
+    OPENSEARCH_ID_PREFIX : S3 URI prefix for the id field in OpenSearch documents. Defaults to 's3://<OPENSEARCH_BUCKET>/'
+    OPENSEARCH_INDEX : OpenSearch index to audit
+```
+Example usage:
+```
+export AWS_ACCESS_KEY_ID=<secret>  
+export AWS_REGION=us-west-2  
+export AWS_SECRET_ACCESS_KEY=<secret>  
+export OPENSEARCH_BUCKET=insitu-parquet-bucket  
+export OPENSEARCH_ENDPOINT=<domain-endpoint>.us-west-2.es.amazonaws.com  
+export OPENSEARCH_ID_PREFIX=s3://insitu-parquet-bucket/  
+export OPENSEARCH_INDEX=parquet_stats_alias  
+export OPENSEARCH_PATH_PREFIX=insitu.parquet/  
+export OPENSEARCH_PORT=443
+
+python audit.py -f mock-s3 -o https://sqs.us-west-2.amazonaws.com/<acct-id>/<queue-name>
+```
+
+## 2: Automated Lambda
+
+Create a lambda function w/ python 3.8 runtime, x86_64 arch, with an execution role that can do the following:
+- List, Get, Put, and Delete S3 target bucket
+- Interact with ES/OpenSearch
+- Invoke lambda functions
+- SQS send message
+- Optional: SNS publish
+
+Config:
+- Handler: `parquet_flask.cdms_lambda_func.audit_tool.execute_lambda.execute_code`
+- To build code .zip follow the instructions in `parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py` (~ line 33)
+- Timeout: No less than 2 minutes, recommend at least 10 minutes
+- Memory: 512 MB
+- Ephemeral storage: 512 MB
+- Environment variables: As with local invocation plus:
+	- `SQS_URL`: URL of queue to publish to
+	- `SNS_ARN`: ARN of SNS topic to publish to
+- Concurrency: 
+	- Use reserved: 1
+
+Once configured/tested, you can use AWS EventBridge to trigger the function on a regular schedule.
\ No newline at end of file
diff --git a/parquet_flask/aws/__init__.py b/parquet_cli/audit_tool/__init__.py
similarity index 89%
copy from parquet_flask/aws/__init__.py
copy to parquet_cli/audit_tool/__init__.py
index fa8ccd5..25f57c9 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_cli/audit_tool/__init__.py
@@ -11,4 +11,6 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+from parquet_cli.audit_tool.audit_tool import audit
diff --git a/parquet_cli/audit_tool/audit_tool.py b/parquet_cli/audit_tool/audit_tool.py
new file mode 100755
index 0000000..ea9db19
--- /dev/null
+++ b/parquet_cli/audit_tool/audit_tool.py
@@ -0,0 +1,382 @@
+########################################################################################################################
+# Description: This script is used to audit the parquet files in S3 against records OpenSearch. It will print out the  #
+#              S3 keys of the files that are not found in OpenSearch.                                                  #
+#                                                                                                                      #
+# Usage:                                                                                                               #
+#   python audit_tool.py [ -o <output_filename> ]                                                                      #
+#                                                                                                                      #
+# Environment variables:                                                                                               #
+#   AWS_ACCESS_KEY_ID: AWS access key ID                                                                               #
+#   AWS_SECRET_ACCESS_KEY: AWS secret access key                                                                       #
+#   OPENSEARCH_ENDPOINT: OpenSearch endpoint                                                                           #
+#   OPENSEARCH_PORT: OpenSearch port                                                                                   #
+#   OPENSEARCH_ID_PREFIX: OpenSearch ID prefix e.g. s3://cdms-dev-in-situ-parquet                                      #
+#   OPENSEARCH_INDEX: OpenSearch index e.g. [ parquet_stats_alias | entry_file_records_alias ]                         #
+#   OPENSEARCH_PATH_PREFIX: OpenSearch path prefix (use '' if no prefix needed)                                        #
+#   OPENSEARCH_BUCKET: OpenSearch bucket                                                                               #
+########################################################################################################################
+
+
+import boto3
+from requests_aws4auth import AWS4Auth
+from elasticsearch import Elasticsearch, RequestsHttpConnection, NotFoundError
+import os
+import sys
+import argparse
+import textwrap
+import json
+import logging
+from tempfile import NamedTemporaryFile
+from datetime import timedelta, datetime, timezone
+
+from parquet_flask.aws import AwsSQS, AwsSNS
+from parquet_flask.cdms_lambda_func.lambda_logger_generator import LambdaLoggerGenerator
+
+
+# logging.basicConfig(
+#     level=logging.INFO,
+#     format=
+# )
+
+LambdaLoggerGenerator.remove_default_handlers()
+logger = LambdaLoggerGenerator.get_logger(
+    __name__,
+    log_format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
+)
+LambdaLoggerGenerator.get_logger('elasticsearch', log_level=logging.WARNING)
+
+
+PHASES = dict(
+    start=0,
+    list=1,
+    audit=2
+)
+
+
+# Append a slash to the end of a string if it doesn't already have one
+def append_slash(string: str):
+    if string is None:
+        return None
+    elif string == '':
+        return string
+    elif string[-1] != '/':
+        return string + '/'
+    else:
+        return string
+
+
+def key_to_sqs_msg(key: str, bucket: str):
+    s3_event = {
+        'Records': [
+            {
+                'eventName': 'ObjectCreated:Put',
+                's3': {
+                    'bucket': {'name': bucket},
+                    'object': {'key': key}
+                }
+            }
+        ]
+    }
+
+    sqs_body = json.dumps(s3_event)
+
+    return sqs_body
+
+
+def reinvoke(state, bucket, s3_client, lambda_client, function_name):
+    state['lastListTime'] = state['lastListTime'].strftime("%Y-%m-%dT%H:%M:%S%z")
+
+    logger.info('Preparing to reinvoke. Persisting audit state to S3')
+
+    object_data = json.dumps(state).encode('utf-8')
+
+    s3_client.put_object(Bucket=bucket, Key='AUDIT_STATE.json', Body=object_data)
+
+    response = lambda_client.invoke(
+        FunctionName=function_name,
+        InvocationType='Event',
+        Payload=json.dumps({"State": {"Bucket": bucket, "Key": "AUDIT_STATE.json"}})
+    )
+
+    logger.info(f'Lambda response: {repr(response)}')
+
+
+def audit(format='plain', output_file=None, sns_topic=None, state=None, lambda_ctx=None):
+    if format == 'mock-s3' and not output_file:
+        raise ValueError('Output file MUST be defined with mock-s3 output format')
+
+    if state is None:
+        state = {}
+
+    # Check if AWS credentials are set
+    AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID', None)
+    AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY', None)
+    AWS_SESSION_TOKEN = os.environ.get('AWS_SESSION_TOKEN', None)
+    if AWS_ACCESS_KEY_ID is None or AWS_SECRET_ACCESS_KEY is None:
+        logger.error('AWS credentials are not set. Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.')
+        exit(1)
+
+    # Check if OpenSearch parameters are set
+    OPENSEARCH_ENDPOINT = os.environ.get('OPENSEARCH_ENDPOINT', None)
+    OPENSEARCH_PORT = os.environ.get('OPENSEARCH_PORT', 443)
+    OPENSEARCH_INDEX = os.environ.get('OPENSEARCH_INDEX', None)
+    OPENSEARCH_PATH_PREFIX = append_slash(os.environ.get('OPENSEARCH_PATH_PREFIX', None))
+    OPENSEARCH_BUCKET = os.environ.get('OPENSEARCH_BUCKET', None)
+    OPENSEARCH_ID_PREFIX = append_slash(os.environ.get('OPENSEARCH_ID_PREFIX', f's3://{OPENSEARCH_BUCKET}/'))
+    if OPENSEARCH_ENDPOINT is None or OPENSEARCH_PORT is None or OPENSEARCH_ID_PREFIX is None or OPENSEARCH_INDEX is None or OPENSEARCH_PATH_PREFIX is None or OPENSEARCH_BUCKET is None:
+        logger.error('OpenSearch parameters are not set. Please set OPENSEARCH_ENDPOINT, OPENSEARCH_PORT, OPENSEARCH_ID_PREFIX, OPENSEARCH_INDEX, OPENSEARCH_PATH_PREFIX, OPENSEARCH_BUCKET.')
+        exit(1)
+
+    # AWS session
+    aws_session_param = {}
+    aws_session_param['aws_access_key_id'] = AWS_ACCESS_KEY_ID
+    aws_session_param['aws_secret_access_key'] = AWS_SECRET_ACCESS_KEY
+
+    if AWS_SESSION_TOKEN:
+        aws_session_param['aws_session_token'] = AWS_SESSION_TOKEN
+
+    aws_session = boto3.Session(**aws_session_param)
+
+    # AWS auth
+    aws_auth = AWS4Auth(
+        aws_session.get_credentials().access_key,
+        aws_session.get_credentials().secret_key,
+        aws_session.region_name,
+        'es',
+        session_token=aws_session.get_credentials().token
+    )
+
+    # S3 paginator
+    s3 = aws_session.client('s3')
+    lambda_client = aws_session.client('lambda')
+
+    phase = state.get('state', 'start')
+    phase = PHASES[phase]
+    marker = state.get('marker')
+    keys: list = state.get('keys', [])
+
+    opensearch_client = Elasticsearch(
+        hosts=[{'host': OPENSEARCH_ENDPOINT, 'port': OPENSEARCH_PORT}],
+        http_auth=aws_auth,
+        use_ssl=True,
+        verify_certs=True,
+        connection_class=RequestsHttpConnection
+    )
+
+    # Go through all files in a bucket
+    count = 0
+    error_count = 0
+    error_s3_keys = []
+    missing_keys = []
+    logger.info('processing... will print out S3 keys that cannot find a match...')
+    if phase < 2:
+        if phase == 0:
+            logger.info(f'Starting listing of bucket {OPENSEARCH_BUCKET}')
+            state['listStartTime'] = datetime.now(timezone.utc)
+        else:
+            logger.info(f'Resuming listing of bucket {OPENSEARCH_BUCKET}')
+
+        logger.info(f'Listing objects older than {state["lastListTime"].strftime("%Y-%m-%dT%H:%M:%S%z")}')
+
+        while True:
+            list_kwargs = dict(
+                Bucket=OPENSEARCH_BUCKET,
+                Prefix=OPENSEARCH_PATH_PREFIX,
+                MaxKeys=1000
+            )
+
+            if marker:
+                list_kwargs['ContinuationToken'] = marker
+
+            page = s3.list_objects_v2(**list_kwargs)
+            keys_to_add = []
+
+            for key in page.get('Contents', []):
+                if key['Key'].endswith('parquet') and key['LastModified'] >= state['lastListTime']:
+                    keys_to_add.append(key['Key'])
+
+            keys.extend(keys_to_add)
+
+            logger.info(f"Listed page of {len(page.get('Contents', [])):,} objects; selected {len(keys_to_add):,}; "
+                        f"total={len(keys):,}")
+
+            if lambda_ctx:
+                remaining_time = timedelta(milliseconds=lambda_ctx.get_remaining_time_in_millis())
+                logger.info(f'Remaining time: {remaining_time}')
+
+            if not page['IsTruncated']:
+                break
+            else:
+                marker = page['NextContinuationToken']
+
+            if lambda_ctx is not None and lambda_ctx.get_remaining_time_in_millis() < (60 * 1000):
+                logger.warning('Lambda is about to time out, re-invoking to resume from this key')
+
+                state = dict(
+                    state='list',
+                    marker=marker,
+                    keys=keys,
+                    listStartTime=state['lastListTime']
+                )
+
+                reinvoke(state, OPENSEARCH_BUCKET, s3, lambda_client, lambda_ctx.function_name)
+                return
+
+        state['lastListTime'] = state['listStartTime']
+        del state['listStartTime']
+
+    # if phase == 3 and marker is not None and marker in keys:
+    #     logger.info(f'Resuming audit from key {marker}')
+    #     index = keys.index(marker)
+    # else:
+    #     logger.info('Starting audit from the beginning')
+    #     index = 0
+    #
+    # keys = keys[index:]
+
+    n_keys = len(keys)
+
+    logger.info(f'Beginning audit on {n_keys:,} keys...')
+
+    need_to_resume = False
+
+    while len(keys) > 0:
+        key = keys.pop(0)
+        count += 1
+        try:
+            # Search key in opensearch
+            opensearch_id = os.path.join(OPENSEARCH_ID_PREFIX + key)
+            opensearch_response = opensearch_client.get(index=OPENSEARCH_INDEX, id=opensearch_id)
+            if opensearch_response is None or not type(opensearch_response) is dict or not opensearch_response['found']:
+                error_count += 1
+                error_s3_keys.append(key)
+                sys.stdout.write("\x1b[2k")
+                logger.info(key)
+                missing_keys.append(key)
+        except NotFoundError as e:
+            error_count += 1
+            error_s3_keys.append(key)
+            sys.stdout.write("\x1b[2k")
+            logger.info(key)
+            missing_keys.append(key)
+        except Exception as e:
+            error_count += 1
+
+        if count % 50 == 0:
+            logger.info(f'Checked {count} files [{(count/n_keys)*100:7.3f}%]')
+            if lambda_ctx:
+                remaining_time = timedelta(milliseconds=lambda_ctx.get_remaining_time_in_millis())
+                logger.info(f'Remaining time: {remaining_time}')
+
+        if lambda_ctx is not None and lambda_ctx.get_remaining_time_in_millis() < (60 * 1000):
+            logger.warning('Lambda is about to time out, re-invoking to resume from this key')
+
+            state = dict(
+                state='audit',
+                marker=key,
+                keys=keys,
+                lastListTime=state['lastListTime']
+            )
+
+            need_to_resume = True
+
+            break
+
+    logger.info(f'Checked {count} files')
+    logger.info(f'Found {len(missing_keys):,} missing keys')
+
+    if len(missing_keys) > 0:
+        if format == 'plain':
+            if output_file:
+                with open(output_file, 'w') as f:
+                    for key in missing_keys:
+                        f.write(key + '\n')
+            else:
+                logger.info('Not writing to file as none was given')
+        else:
+            sqs_messages = [key_to_sqs_msg(k, OPENSEARCH_BUCKET) for k in missing_keys]
+
+            sqs = AwsSQS()
+
+            sqs_response = None
+
+            for m in sqs_messages:
+                sqs_response = sqs.send_message(output_file, m)
+
+            logger.info(f'SQS response: {repr(sqs_response)}')
+
+            if sns_topic:
+                sns = AwsSNS()
+
+                sns_response = sns.publish(
+                    sns_topic,
+                    f'Parquet stats audit found {len(missing_keys):,} missing keys. Trying to republish to SQS.',
+                    'Insitu audit'
+                )
+
+                logger.info(f'SNS response: {repr(sns_response)}')
+
+    if need_to_resume:
+        reinvoke(state, OPENSEARCH_BUCKET, s3, lambda_client, lambda_ctx.function_name)
+        return
+
+    # Finished, reset state to just last list time
+
+    logger.info('Audit complete! Persisting state to S3')
+
+    state = {'lastListTime': state['lastListTime'].strftime("%Y-%m-%dT%H:%M:%S%z")}
+
+    object_data = json.dumps(state).encode('utf-8')
+
+    s3.put_object(Bucket=OPENSEARCH_BUCKET, Key='AUDIT_STATE.json', Body=object_data)
+
+
+if __name__ == '__main__':
+    # Parse arguments
+    parser = argparse.ArgumentParser(
+        description='Audit parquet files in S3 against records in OpenSearch',
+        epilog=textwrap.dedent('''\
+            Environment variables:  (describe what they are for & provide examples where appropriate
+                AWS_ACCESS_KEY_ID : AWS access key ID for S3 bucket & OpenSearch index
+                AWS_SECRET_ACCESS_KEY : AWS secret access key for S3 bucket & OpenSearch index
+                AWS_REGION : AWS region for S3 bucket & OpenSearch index
+                OPENSEARCH_ENDPOINT : Endpoint for OpenSearch domain
+                OPENSEARCH_PORT : Port to connect to OpenSearch (Default: 443)
+                OPENSEARCH_BUCKET : Name of the bucket storing ingested Parquet files.
+                OPENSEARCH_PATH_PREFIX : Key prefix for objects in OPENSEARCH_BUCKET to audit.
+                OPENSEARCH_ID_PREFIX : S3 URI prefix for the id field in OpenSearch documents. Defaults to 's3://<OPENSEARCH_BUCKET>/'
+                OPENSEARCH_INDEX : OpenSearch index to audit
+        '''),
+        formatter_class=argparse.RawDescriptionHelpFormatter
+    )
+    parser.add_argument(
+        '-o', '--output-file',
+        nargs='?',
+        type=str,
+        help='file to output the S3 keys of the files that are not found in OpenSearch',
+        dest='output'
+    )
+
+    parser.add_argument(
+        '-f', '--format',
+        choices=['plain', 'mock-s3'],
+        default='plain',
+        dest='format',
+        help='Output format. \'plain\' will output keys of missing parquet files to the output file in plain text. '
+             '\'mock-s3\' will output missing keys to SQS (-o is required as the SQS queue URL), formatted as an S3 '
+             'object created event'
+    )
+
+    def utcfromisoformat(s):
+        return datetime.fromisoformat(s).astimezone(timezone.utc)
+
+    parser.add_argument(
+        '--from-time',
+        type=utcfromisoformat,
+        default=datetime(1970, 1, 1, tzinfo=timezone.utc),
+        dest='llt',
+        help='Check all objects newer than this time as ISO datetime string. Default: 1970-01-01'
+    )
+
+    args = parser.parse_args()
+    audit(args.format, args.output, state=dict(lastListTime=args.llt))
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/__init__.py
index fa8ccd5..e47d525 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/__init__.py
@@ -11,4 +11,7 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+from parquet_flask.aws.aws_sqs import AwsSQS
+from parquet_flask.aws.aws_sns import AwsSNS
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/aws_sns.py
similarity index 61%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/aws/aws_sns.py
index fa8ccd5..3062078 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/aws_sns.py
@@ -11,4 +11,22 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+import logging
+from parquet_flask.aws.aws_cred import AwsCred
+
+LOGGER = logging.getLogger(__name__)
+
+
+class AwsSNS(AwsCred):
+    def __init__(self):
+        super().__init__()
+        self.__sns_client = self.get_client('sns')
+
+    def publish(self, topic_arn: str, message: str, subject: str):
+        return self.__sns_client.publish(
+            TopicArn=topic_arn,
+            Message=message,
+            Subject=subject
+        )
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/aws_sqs.py
similarity index 62%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/aws/aws_sqs.py
index fa8ccd5..b63db68 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/aws_sqs.py
@@ -11,4 +11,21 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+import logging
+from parquet_flask.aws.aws_cred import AwsCred
+
+LOGGER = logging.getLogger(__name__)
+
+
+class AwsSQS(AwsCred):
+    def __init__(self):
+        super().__init__()
+        self.__sqs_client = self.get_client('sqs')
+
+    def send_message(self, queue_url: str, message: str):
+        return self.__sqs_client.send_message(
+            QueueUrl=queue_url,
+            MessageBody=message
+        )
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/cdms_lambda_func/audit_tool/__init__.py
similarity index 95%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/cdms_lambda_func/audit_tool/__init__.py
index fa8ccd5..8afd240 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/cdms_lambda_func/audit_tool/__init__.py
@@ -11,4 +11,5 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
diff --git a/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py b/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py
new file mode 100644
index 0000000..7e0568b
--- /dev/null
+++ b/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from parquet_cli.audit_tool import audit
+import os
+import json
+import logging
+import traceback
+from io import BytesIO
+
+from datetime import datetime, timezone
+
+import boto3
+
+# logging.basicConfig(
+#     level=logging.INFO,
+#     format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
+# )
+
+
+# Build process:
+# 1: cd to incubator-sdap-in-situ-data-services/parquet_flask/cdms_lambda_func/audit_tool
+# 2: mkdir package
+# 3: rename incubator-sdap-in-situ-data-services/setup_lambda.py to incubator-sdap-in-situ-data-services/setup.py
+# 4: pip install --target ./package ../../..
+# 5: cd package
+# 6: zip -r ../audit_lambda.zip .
+# 7: Upload zip to lambda
+# 8: Clean up
+
+
+def execute_code(event, context):
+    state = None
+    s3 = boto3.client('s3')
+
+    if event:
+        if isinstance(event, str):
+            event = json.loads(event)
+
+        if 'State' in event:
+            buf = BytesIO()
+
+            s3.download_fileobj(event['State']['Bucket'], event['State']['Key'], buf)
+            buf.seek(0)
+            state = json.load(buf)
+
+            print('Loaded persisted state from S3', flush=True)
+
+    if state is None:
+        try:
+            buf = BytesIO()
+
+            s3.download_fileobj(os.getenv('OPENSEARCH_BUCKET'), 'AUDIT_STATE.json', buf)
+            buf.seek(0)
+            state = json.load(buf)
+
+            print('Loaded persisted state from S3', flush=True)
+        except:
+            print('Could not load state, starting from scratch')
+            print(traceback.print_exc())
+            state = {}
+
+    if 'lastListTime' not in state:
+        state['lastListTime'] = datetime(1, 1, 1, tzinfo=timezone.utc)
+    else:
+        state['lastListTime'] = datetime.strptime(state['lastListTime'], "%Y-%m-%dT%H:%M:%S%z")
+
+    print('INVOKING AUDIT TOOL', flush=True)
+    print('', flush=True)
+
+    audit(
+        'mock-s3',
+        os.getenv('SQS_URL'),
+        os.getenv('SNS_ARN'),
+        state=state,
+        lambda_ctx=context
+    )
diff --git a/setup.py b/setup.py
index 4ec925c..1380592 100644
--- a/setup.py
+++ b/setup.py
@@ -50,7 +50,7 @@ setup(
     install_requires=install_requires,
     author="Apache SDAP",
     author_email="dev@sdap.apache.org",
-    python_requires="==3.7",
+    python_requires=">=3.7",
     license='NONE',
     include_package_data=True,
 )
diff --git a/setup_lambda.py b/setup_lambda.py
index f06b623..1e0e6a4 100644
--- a/setup_lambda.py
+++ b/setup_lambda.py
@@ -34,7 +34,7 @@ setup(
     install_requires=install_requires,
     author="Apache SDAP",
     author_email="dev@sdap.apache.org",
-    python_requires="==3.7",
+    python_requires=">=3.7",
     license='NONE',
     include_package_data=True,
 )