You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/01 20:37:13 UTC
(incubator-sdap-in-situ-data-services) branch master updated: Data validation tool lambda (#24)
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git
The following commit(s) were added to refs/heads/master by this push:
new f3da62b Data validation tool lambda (#24)
f3da62b is described below
commit f3da62b0b9189d57970cc0243c2b0bec1a125870
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Fri Mar 1 12:37:09 2024 -0800
Data validation tool lambda (#24)
* Temporary solution to http --> https redirect issue
* Add data validation tool
* Update comment
* Add output-file option and better commandline output handling
* Help message for env vars
* Option for formatting output
* Make audit tool importable as a function
* SQS and SNS classes
* Audit tool lambda adaptation
* Audit tool lambda adaptation
* Audit tool (attempt) to reinvoke lambda if it is close to timing out
* Works in Lambda now
* Logging
* Optimization: Select objects for auditing that were added after last execution
* Audit tool readme
* Audit tool readme build instructions
* Initialize last list time in state on local run
---------
Co-authored-by: Jason Min-Liang Kang <ja...@gmail.com>
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
parquet_cli/audit_tool/README.md | 83 +++++
.../aws => parquet_cli/audit_tool}/__init__.py | 4 +-
parquet_cli/audit_tool/audit_tool.py | 382 +++++++++++++++++++++
parquet_flask/aws/__init__.py | 5 +-
parquet_flask/aws/{__init__.py => aws_sns.py} | 20 +-
parquet_flask/aws/{__init__.py => aws_sqs.py} | 19 +-
.../audit_tool}/__init__.py | 3 +-
.../cdms_lambda_func/audit_tool/execute_lambda.py | 89 +++++
setup.py | 2 +-
setup_lambda.py | 2 +-
10 files changed, 602 insertions(+), 7 deletions(-)
diff --git a/parquet_cli/audit_tool/README.md b/parquet_cli/audit_tool/README.md
new file mode 100644
index 0000000..5d1f3e4
--- /dev/null
+++ b/parquet_cli/audit_tool/README.md
@@ -0,0 +1,83 @@
+# Insitu Stats Audit and Repair
+
+A bug has been observed where the Parquet files produced at ingestion are missed by the stats extraction. This is
+problematic because, in addition to the stats endpoint being incorrect, the spatiotemporal information in stats records
+is used in subsetting, leading to the appearance of missing data as only the Parquet files with extant stats records are
+processed at subset time.
+
+Until the cause is identified & fixed, this tool can be used to verify that all the Parquet files in the destination S3
+bucket are present in the stats index. It can either list the missing keys to console with the added option of listing
+them to a file as well, or it can publish them to AWS SQS as mock-S3 ObjectCreated events to re-trigger the stats
+extraction Lambda with the missing key. Optionally, it can notify users to missing keys via publishing to an AWS SNS
+topic.
+
+There are two ways it's designed to be run:
+
+## 1: Locally
+
+1. Clone the repo: `git clone --branch data-validation-tool-lambda https://github.com/RKuttruff/incubator-sdap-in-situ-data-services.git`
+2. `cd incubator-sdap-in-situ-data-services`
+3. Run `python setup_lambda.py install`
+4. cd `parquet_cli/audit_tool`
+5. Usage:
+```
+(parquet_flask) rileykk@MT-407178 audit_tool % python audit_tool.py -h
+usage: audit_tool.py [-h] [-o [OUTPUT]] [-f {plain,mock-s3}]
+
+Audit parquet files in S3 against records in OpenSearch
+
+optional arguments:
+ -h, --help show this help message and exit
+ -o [OUTPUT], --output-file [OUTPUT]
+ file to output the S3 keys of the files that are not found in OpenSearch
+ -f {plain,mock-s3}, --format {plain,mock-s3}
+ Output format. 'plain' will output keys of missing parquet files to the output file in plain text. 'mock-s3' will output missing keys to SQS (-o is required as the SQS queue URL), formatted as an S3 object created event
+
+Environment variables: (describe what they are for & provide examples where appropriate
+ AWS_ACCESS_KEY_ID : AWS access key ID for S3 bucket & OpenSearch index
+ AWS_SECRET_ACCESS_KEY : AWS secret access key for S3 bucket & OpenSearch index
+ AWS_REGION : AWS region for S3 bucket & OpenSearch index
+ OPENSEARCH_ENDPOINT : Endpoint for OpenSearch domain
+ OPENSEARCH_PORT : Port to connect to OpenSearch (Default: 443)
+ OPENSEARCH_BUCKET : Name of the bucket storing ingested Parquet files.
+ OPENSEARCH_PATH_PREFIX : Key prefix for objects in OPENSEARCH_BUCKET to audit.
+ OPENSEARCH_ID_PREFIX : S3 URI prefix for the id field in OpenSearch documents. Defaults to 's3://<OPENSEARCH_BUCKET>/'
+ OPENSEARCH_INDEX : OpenSearch index to audit
+```
+Example usage:
+```
+export AWS_ACCESS_KEY_ID=<secret>
+export AWS_REGION=us-west-2
+export AWS_SECRET_ACCESS_KEY=<secret>
+export OPENSEARCH_BUCKET=insitu-parquet-bucket
+export OPENSEARCH_ENDPOINT=<domain-endpoint>.us-west-2.es.amazonaws.com
+export OPENSEARCH_ID_PREFIX=s3://insitu-parquet-bucket/
+export OPENSEARCH_INDEX=parquet_stats_alias
+export OPENSEARCH_PATH_PREFIX=insitu.parquet/
+export OPENSEARCH_PORT=443
+
+python audit.py -f mock-s3 -o https://sqs.us-west-2.amazonaws.com/<acct-id>/<queue-name>
+```
+
+## 2: Automated Lambda
+
+Create a lambda function w/ python 3.8 runtime, x86_64 arch, with an execution role that can do the following:
+- List, Get, Put, and Delete S3 target bucket
+- Interact with ES/OpenSearch
+- Invoke lambda functions
+- SQS send message
+- Optional: SNS publish
+
+Config:
+- Handler: `parquet_flask.cdms_lambda_func.audit_tool.execute_lambda.execute_code`
+- To build code .zip follow the instructions in `parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py` (~ line 33)
+- Timeout: No less than 2 minutes, recommend at least 10 minutes
+- Memory: 512 MB
+- Ephemeral storage: 512 MB
+- Environment variables: As with local invocation plus:
+ - `SQS_URL`: URL of queue to publish to
+ - `SNS_ARN`: ARN of SNS topic to publish to
+- Concurrency:
+ - Use reserved: 1
+
+Once configured/tested, you can use AWS EventBridge to trigger the function on a regular schedule.
\ No newline at end of file
diff --git a/parquet_flask/aws/__init__.py b/parquet_cli/audit_tool/__init__.py
similarity index 89%
copy from parquet_flask/aws/__init__.py
copy to parquet_cli/audit_tool/__init__.py
index fa8ccd5..25f57c9 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_cli/audit_tool/__init__.py
@@ -11,4 +11,6 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+from parquet_cli.audit_tool.audit_tool import audit
diff --git a/parquet_cli/audit_tool/audit_tool.py b/parquet_cli/audit_tool/audit_tool.py
new file mode 100755
index 0000000..ea9db19
--- /dev/null
+++ b/parquet_cli/audit_tool/audit_tool.py
@@ -0,0 +1,382 @@
+########################################################################################################################
+# Description: This script is used to audit the parquet files in S3 against records OpenSearch. It will print out the #
+# S3 keys of the files that are not found in OpenSearch. #
+# #
+# Usage: #
+# python audit_tool.py [ -o <output_filename> ] #
+# #
+# Environment variables: #
+# AWS_ACCESS_KEY_ID: AWS access key ID #
+# AWS_SECRET_ACCESS_KEY: AWS secret access key #
+# OPENSEARCH_ENDPOINT: OpenSearch endpoint #
+# OPENSEARCH_PORT: OpenSearch port #
+# OPENSEARCH_ID_PREFIX: OpenSearch ID prefix e.g. s3://cdms-dev-in-situ-parquet #
+# OPENSEARCH_INDEX: OpenSearch index e.g. [ parquet_stats_alias | entry_file_records_alias ] #
+# OPENSEARCH_PATH_PREFIX: OpenSearch path prefix (use '' if no prefix needed) #
+# OPENSEARCH_BUCKET: OpenSearch bucket #
+########################################################################################################################
+
+
+import boto3
+from requests_aws4auth import AWS4Auth
+from elasticsearch import Elasticsearch, RequestsHttpConnection, NotFoundError
+import os
+import sys
+import argparse
+import textwrap
+import json
+import logging
+from tempfile import NamedTemporaryFile
+from datetime import timedelta, datetime, timezone
+
+from parquet_flask.aws import AwsSQS, AwsSNS
+from parquet_flask.cdms_lambda_func.lambda_logger_generator import LambdaLoggerGenerator
+
+
+# logging.basicConfig(
+# level=logging.INFO,
+# format=
+# )
+
+LambdaLoggerGenerator.remove_default_handlers()
+logger = LambdaLoggerGenerator.get_logger(
+ __name__,
+ log_format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
+)
+LambdaLoggerGenerator.get_logger('elasticsearch', log_level=logging.WARNING)
+
+
+PHASES = dict(
+ start=0,
+ list=1,
+ audit=2
+)
+
+
+# Append a slash to the end of a string if it doesn't already have one
+def append_slash(string: str):
+ if string is None:
+ return None
+ elif string == '':
+ return string
+ elif string[-1] != '/':
+ return string + '/'
+ else:
+ return string
+
+
+def key_to_sqs_msg(key: str, bucket: str):
+ s3_event = {
+ 'Records': [
+ {
+ 'eventName': 'ObjectCreated:Put',
+ 's3': {
+ 'bucket': {'name': bucket},
+ 'object': {'key': key}
+ }
+ }
+ ]
+ }
+
+ sqs_body = json.dumps(s3_event)
+
+ return sqs_body
+
+
+def reinvoke(state, bucket, s3_client, lambda_client, function_name):
+ state['lastListTime'] = state['lastListTime'].strftime("%Y-%m-%dT%H:%M:%S%z")
+
+ logger.info('Preparing to reinvoke. Persisting audit state to S3')
+
+ object_data = json.dumps(state).encode('utf-8')
+
+ s3_client.put_object(Bucket=bucket, Key='AUDIT_STATE.json', Body=object_data)
+
+ response = lambda_client.invoke(
+ FunctionName=function_name,
+ InvocationType='Event',
+ Payload=json.dumps({"State": {"Bucket": bucket, "Key": "AUDIT_STATE.json"}})
+ )
+
+ logger.info(f'Lambda response: {repr(response)}')
+
+
+def audit(format='plain', output_file=None, sns_topic=None, state=None, lambda_ctx=None):
+ if format == 'mock-s3' and not output_file:
+ raise ValueError('Output file MUST be defined with mock-s3 output format')
+
+ if state is None:
+ state = {}
+
+ # Check if AWS credentials are set
+ AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID', None)
+ AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY', None)
+ AWS_SESSION_TOKEN = os.environ.get('AWS_SESSION_TOKEN', None)
+ if AWS_ACCESS_KEY_ID is None or AWS_SECRET_ACCESS_KEY is None:
+ logger.error('AWS credentials are not set. Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.')
+ exit(1)
+
+ # Check if OpenSearch parameters are set
+ OPENSEARCH_ENDPOINT = os.environ.get('OPENSEARCH_ENDPOINT', None)
+ OPENSEARCH_PORT = os.environ.get('OPENSEARCH_PORT', 443)
+ OPENSEARCH_INDEX = os.environ.get('OPENSEARCH_INDEX', None)
+ OPENSEARCH_PATH_PREFIX = append_slash(os.environ.get('OPENSEARCH_PATH_PREFIX', None))
+ OPENSEARCH_BUCKET = os.environ.get('OPENSEARCH_BUCKET', None)
+ OPENSEARCH_ID_PREFIX = append_slash(os.environ.get('OPENSEARCH_ID_PREFIX', f's3://{OPENSEARCH_BUCKET}/'))
+ if OPENSEARCH_ENDPOINT is None or OPENSEARCH_PORT is None or OPENSEARCH_ID_PREFIX is None or OPENSEARCH_INDEX is None or OPENSEARCH_PATH_PREFIX is None or OPENSEARCH_BUCKET is None:
+ logger.error('OpenSearch parameters are not set. Please set OPENSEARCH_ENDPOINT, OPENSEARCH_PORT, OPENSEARCH_ID_PREFIX, OPENSEARCH_INDEX, OPENSEARCH_PATH_PREFIX, OPENSEARCH_BUCKET.')
+ exit(1)
+
+ # AWS session
+ aws_session_param = {}
+ aws_session_param['aws_access_key_id'] = AWS_ACCESS_KEY_ID
+ aws_session_param['aws_secret_access_key'] = AWS_SECRET_ACCESS_KEY
+
+ if AWS_SESSION_TOKEN:
+ aws_session_param['aws_session_token'] = AWS_SESSION_TOKEN
+
+ aws_session = boto3.Session(**aws_session_param)
+
+ # AWS auth
+ aws_auth = AWS4Auth(
+ aws_session.get_credentials().access_key,
+ aws_session.get_credentials().secret_key,
+ aws_session.region_name,
+ 'es',
+ session_token=aws_session.get_credentials().token
+ )
+
+ # S3 paginator
+ s3 = aws_session.client('s3')
+ lambda_client = aws_session.client('lambda')
+
+ phase = state.get('state', 'start')
+ phase = PHASES[phase]
+ marker = state.get('marker')
+ keys: list = state.get('keys', [])
+
+ opensearch_client = Elasticsearch(
+ hosts=[{'host': OPENSEARCH_ENDPOINT, 'port': OPENSEARCH_PORT}],
+ http_auth=aws_auth,
+ use_ssl=True,
+ verify_certs=True,
+ connection_class=RequestsHttpConnection
+ )
+
+ # Go through all files in a bucket
+ count = 0
+ error_count = 0
+ error_s3_keys = []
+ missing_keys = []
+ logger.info('processing... will print out S3 keys that cannot find a match...')
+ if phase < 2:
+ if phase == 0:
+ logger.info(f'Starting listing of bucket {OPENSEARCH_BUCKET}')
+ state['listStartTime'] = datetime.now(timezone.utc)
+ else:
+ logger.info(f'Resuming listing of bucket {OPENSEARCH_BUCKET}')
+
+ logger.info(f'Listing objects older than {state["lastListTime"].strftime("%Y-%m-%dT%H:%M:%S%z")}')
+
+ while True:
+ list_kwargs = dict(
+ Bucket=OPENSEARCH_BUCKET,
+ Prefix=OPENSEARCH_PATH_PREFIX,
+ MaxKeys=1000
+ )
+
+ if marker:
+ list_kwargs['ContinuationToken'] = marker
+
+ page = s3.list_objects_v2(**list_kwargs)
+ keys_to_add = []
+
+ for key in page.get('Contents', []):
+ if key['Key'].endswith('parquet') and key['LastModified'] >= state['lastListTime']:
+ keys_to_add.append(key['Key'])
+
+ keys.extend(keys_to_add)
+
+ logger.info(f"Listed page of {len(page.get('Contents', [])):,} objects; selected {len(keys_to_add):,}; "
+ f"total={len(keys):,}")
+
+ if lambda_ctx:
+ remaining_time = timedelta(milliseconds=lambda_ctx.get_remaining_time_in_millis())
+ logger.info(f'Remaining time: {remaining_time}')
+
+ if not page['IsTruncated']:
+ break
+ else:
+ marker = page['NextContinuationToken']
+
+ if lambda_ctx is not None and lambda_ctx.get_remaining_time_in_millis() < (60 * 1000):
+ logger.warning('Lambda is about to time out, re-invoking to resume from this key')
+
+ state = dict(
+ state='list',
+ marker=marker,
+ keys=keys,
+ listStartTime=state['lastListTime']
+ )
+
+ reinvoke(state, OPENSEARCH_BUCKET, s3, lambda_client, lambda_ctx.function_name)
+ return
+
+ state['lastListTime'] = state['listStartTime']
+ del state['listStartTime']
+
+ # if phase == 3 and marker is not None and marker in keys:
+ # logger.info(f'Resuming audit from key {marker}')
+ # index = keys.index(marker)
+ # else:
+ # logger.info('Starting audit from the beginning')
+ # index = 0
+ #
+ # keys = keys[index:]
+
+ n_keys = len(keys)
+
+ logger.info(f'Beginning audit on {n_keys:,} keys...')
+
+ need_to_resume = False
+
+ while len(keys) > 0:
+ key = keys.pop(0)
+ count += 1
+ try:
+ # Search key in opensearch
+ opensearch_id = os.path.join(OPENSEARCH_ID_PREFIX + key)
+ opensearch_response = opensearch_client.get(index=OPENSEARCH_INDEX, id=opensearch_id)
+ if opensearch_response is None or not type(opensearch_response) is dict or not opensearch_response['found']:
+ error_count += 1
+ error_s3_keys.append(key)
+ sys.stdout.write("\x1b[2k")
+ logger.info(key)
+ missing_keys.append(key)
+ except NotFoundError as e:
+ error_count += 1
+ error_s3_keys.append(key)
+ sys.stdout.write("\x1b[2k")
+ logger.info(key)
+ missing_keys.append(key)
+ except Exception as e:
+ error_count += 1
+
+ if count % 50 == 0:
+ logger.info(f'Checked {count} files [{(count/n_keys)*100:7.3f}%]')
+ if lambda_ctx:
+ remaining_time = timedelta(milliseconds=lambda_ctx.get_remaining_time_in_millis())
+ logger.info(f'Remaining time: {remaining_time}')
+
+ if lambda_ctx is not None and lambda_ctx.get_remaining_time_in_millis() < (60 * 1000):
+ logger.warning('Lambda is about to time out, re-invoking to resume from this key')
+
+ state = dict(
+ state='audit',
+ marker=key,
+ keys=keys,
+ lastListTime=state['lastListTime']
+ )
+
+ need_to_resume = True
+
+ break
+
+ logger.info(f'Checked {count} files')
+ logger.info(f'Found {len(missing_keys):,} missing keys')
+
+ if len(missing_keys) > 0:
+ if format == 'plain':
+ if output_file:
+ with open(output_file, 'w') as f:
+ for key in missing_keys:
+ f.write(key + '\n')
+ else:
+ logger.info('Not writing to file as none was given')
+ else:
+ sqs_messages = [key_to_sqs_msg(k, OPENSEARCH_BUCKET) for k in missing_keys]
+
+ sqs = AwsSQS()
+
+ sqs_response = None
+
+ for m in sqs_messages:
+ sqs_response = sqs.send_message(output_file, m)
+
+ logger.info(f'SQS response: {repr(sqs_response)}')
+
+ if sns_topic:
+ sns = AwsSNS()
+
+ sns_response = sns.publish(
+ sns_topic,
+ f'Parquet stats audit found {len(missing_keys):,} missing keys. Trying to republish to SQS.',
+ 'Insitu audit'
+ )
+
+ logger.info(f'SNS response: {repr(sns_response)}')
+
+ if need_to_resume:
+ reinvoke(state, OPENSEARCH_BUCKET, s3, lambda_client, lambda_ctx.function_name)
+ return
+
+ # Finished, reset state to just last list time
+
+ logger.info('Audit complete! Persisting state to S3')
+
+ state = {'lastListTime': state['lastListTime'].strftime("%Y-%m-%dT%H:%M:%S%z")}
+
+ object_data = json.dumps(state).encode('utf-8')
+
+ s3.put_object(Bucket=OPENSEARCH_BUCKET, Key='AUDIT_STATE.json', Body=object_data)
+
+
+if __name__ == '__main__':
+ # Parse arguments
+ parser = argparse.ArgumentParser(
+ description='Audit parquet files in S3 against records in OpenSearch',
+ epilog=textwrap.dedent('''\
+ Environment variables: (describe what they are for & provide examples where appropriate
+ AWS_ACCESS_KEY_ID : AWS access key ID for S3 bucket & OpenSearch index
+ AWS_SECRET_ACCESS_KEY : AWS secret access key for S3 bucket & OpenSearch index
+ AWS_REGION : AWS region for S3 bucket & OpenSearch index
+ OPENSEARCH_ENDPOINT : Endpoint for OpenSearch domain
+ OPENSEARCH_PORT : Port to connect to OpenSearch (Default: 443)
+ OPENSEARCH_BUCKET : Name of the bucket storing ingested Parquet files.
+ OPENSEARCH_PATH_PREFIX : Key prefix for objects in OPENSEARCH_BUCKET to audit.
+ OPENSEARCH_ID_PREFIX : S3 URI prefix for the id field in OpenSearch documents. Defaults to 's3://<OPENSEARCH_BUCKET>/'
+ OPENSEARCH_INDEX : OpenSearch index to audit
+ '''),
+ formatter_class=argparse.RawDescriptionHelpFormatter
+ )
+ parser.add_argument(
+ '-o', '--output-file',
+ nargs='?',
+ type=str,
+ help='file to output the S3 keys of the files that are not found in OpenSearch',
+ dest='output'
+ )
+
+ parser.add_argument(
+ '-f', '--format',
+ choices=['plain', 'mock-s3'],
+ default='plain',
+ dest='format',
+ help='Output format. \'plain\' will output keys of missing parquet files to the output file in plain text. '
+ '\'mock-s3\' will output missing keys to SQS (-o is required as the SQS queue URL), formatted as an S3 '
+ 'object created event'
+ )
+
+ def utcfromisoformat(s):
+ return datetime.fromisoformat(s).astimezone(timezone.utc)
+
+ parser.add_argument(
+ '--from-time',
+ type=utcfromisoformat,
+ default=datetime(1970, 1, 1, tzinfo=timezone.utc),
+ dest='llt',
+ help='Check all objects newer than this time as ISO datetime string. Default: 1970-01-01'
+ )
+
+ args = parser.parse_args()
+ audit(args.format, args.output, state=dict(lastListTime=args.llt))
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/__init__.py
index fa8ccd5..e47d525 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/__init__.py
@@ -11,4 +11,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+from parquet_flask.aws.aws_sqs import AwsSQS
+from parquet_flask.aws.aws_sns import AwsSNS
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/aws_sns.py
similarity index 61%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/aws/aws_sns.py
index fa8ccd5..3062078 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/aws_sns.py
@@ -11,4 +11,22 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+import logging
+from parquet_flask.aws.aws_cred import AwsCred
+
+LOGGER = logging.getLogger(__name__)
+
+
+class AwsSNS(AwsCred):
+ def __init__(self):
+ super().__init__()
+ self.__sns_client = self.get_client('sns')
+
+ def publish(self, topic_arn: str, message: str, subject: str):
+ return self.__sns_client.publish(
+ TopicArn=topic_arn,
+ Message=message,
+ Subject=subject
+ )
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/aws/aws_sqs.py
similarity index 62%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/aws/aws_sqs.py
index fa8ccd5..b63db68 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/aws/aws_sqs.py
@@ -11,4 +11,21 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
+import logging
+from parquet_flask.aws.aws_cred import AwsCred
+
+LOGGER = logging.getLogger(__name__)
+
+
+class AwsSQS(AwsCred):
+ def __init__(self):
+ super().__init__()
+ self.__sqs_client = self.get_client('sqs')
+
+ def send_message(self, queue_url: str, message: str):
+ return self.__sqs_client.send_message(
+ QueueUrl=queue_url,
+ MessageBody=message
+ )
diff --git a/parquet_flask/aws/__init__.py b/parquet_flask/cdms_lambda_func/audit_tool/__init__.py
similarity index 95%
copy from parquet_flask/aws/__init__.py
copy to parquet_flask/cdms_lambda_func/audit_tool/__init__.py
index fa8ccd5..8afd240 100644
--- a/parquet_flask/aws/__init__.py
+++ b/parquet_flask/cdms_lambda_func/audit_tool/__init__.py
@@ -11,4 +11,5 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
+
diff --git a/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py b/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py
new file mode 100644
index 0000000..7e0568b
--- /dev/null
+++ b/parquet_flask/cdms_lambda_func/audit_tool/execute_lambda.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from parquet_cli.audit_tool import audit
+import os
+import json
+import logging
+import traceback
+from io import BytesIO
+
+from datetime import datetime, timezone
+
+import boto3
+
+# logging.basicConfig(
+# level=logging.INFO,
+# format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
+# )
+
+
+# Build process:
+# 1: cd to incubator-sdap-in-situ-data-services/parquet_flask/cdms_lambda_func/audit_tool
+# 2: mkdir package
+# 3: rename incubator-sdap-in-situ-data-services/setup_lambda.py to incubator-sdap-in-situ-data-services/setup.py
+# 4: pip install --target ./package ../../..
+# 5: cd package
+# 6: zip -r ../audit_lambda.zip .
+# 7: Upload zip to lambda
+# 8: Clean up
+
+
+def execute_code(event, context):
+ state = None
+ s3 = boto3.client('s3')
+
+ if event:
+ if isinstance(event, str):
+ event = json.loads(event)
+
+ if 'State' in event:
+ buf = BytesIO()
+
+ s3.download_fileobj(event['State']['Bucket'], event['State']['Key'], buf)
+ buf.seek(0)
+ state = json.load(buf)
+
+ print('Loaded persisted state from S3', flush=True)
+
+ if state is None:
+ try:
+ buf = BytesIO()
+
+ s3.download_fileobj(os.getenv('OPENSEARCH_BUCKET'), 'AUDIT_STATE.json', buf)
+ buf.seek(0)
+ state = json.load(buf)
+
+ print('Loaded persisted state from S3', flush=True)
+ except:
+ print('Could not load state, starting from scratch')
+ print(traceback.print_exc())
+ state = {}
+
+ if 'lastListTime' not in state:
+ state['lastListTime'] = datetime(1, 1, 1, tzinfo=timezone.utc)
+ else:
+ state['lastListTime'] = datetime.strptime(state['lastListTime'], "%Y-%m-%dT%H:%M:%S%z")
+
+ print('INVOKING AUDIT TOOL', flush=True)
+ print('', flush=True)
+
+ audit(
+ 'mock-s3',
+ os.getenv('SQS_URL'),
+ os.getenv('SNS_ARN'),
+ state=state,
+ lambda_ctx=context
+ )
diff --git a/setup.py b/setup.py
index 4ec925c..1380592 100644
--- a/setup.py
+++ b/setup.py
@@ -50,7 +50,7 @@ setup(
install_requires=install_requires,
author="Apache SDAP",
author_email="dev@sdap.apache.org",
- python_requires="==3.7",
+ python_requires=">=3.7",
license='NONE',
include_package_data=True,
)
diff --git a/setup_lambda.py b/setup_lambda.py
index f06b623..1e0e6a4 100644
--- a/setup_lambda.py
+++ b/setup_lambda.py
@@ -34,7 +34,7 @@ setup(
install_requires=install_requires,
author="Apache SDAP",
author_email="dev@sdap.apache.org",
- python_requires="==3.7",
+ python_requires=">=3.7",
license='NONE',
include_package_data=True,
)