You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/01 20:37:01 UTC

(incubator-sdap-in-situ-data-services) branch master updated: Indexing Lambda fix (#27)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git


The following commit(s) were added to refs/heads/master by this push:
     new 3413e63  Indexing Lambda fix (#27)
3413e63 is described below

commit 3413e638fd14276aa51c6f74b2d0b14a99d04399
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Fri Mar 1 12:36:56 2024 -0800

    Indexing Lambda fix (#27)
    
    * Temporary solution to http --> https redirect issue
    
    * Manually unpack all S3 event records from all SQS records in the triggering Lambda event
    
    * Pin dependencies that were causing import issues when new docker img was built
    
    * Run through all S3 events in trigger event
    
    - Don't break the loop if a temp parquet file is found
    - Don't raise an error in an invalid event, just log as error and skip
    
    ---------
    
    Co-authored-by: Jason Min-Liang Kang <ja...@gmail.com>
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 docker/parquet.lambda.Dockerfile                        |  2 +-
 .../index_to_es/parquet_file_es_indexer.py              |  8 ++++++--
 parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py   | 17 ++++++++++++-----
 parquet_flask/v1/query_data_doms.py                     |  8 ++++----
 parquet_flask/v1/query_data_doms_custom_pagination.py   |  6 +++---
 setup_lambda.py                                         |  3 ++-
 6 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/docker/parquet.lambda.Dockerfile b/docker/parquet.lambda.Dockerfile
index d6df5fc..38b2730 100644
--- a/docker/parquet.lambda.Dockerfile
+++ b/docker/parquet.lambda.Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM public.ecr.aws/lambda/python:3.7
+FROM public.ecr.aws/lambda/python:3.7.2022.09.27.11
 
 # Reference: https://aws.plainenglish.io/spark-on-aws-lambda-c65877c0ac96
 #USER root
diff --git a/parquet_flask/cdms_lambda_func/index_to_es/parquet_file_es_indexer.py b/parquet_flask/cdms_lambda_func/index_to_es/parquet_file_es_indexer.py
index 523adfa..66d32af 100644
--- a/parquet_flask/cdms_lambda_func/index_to_es/parquet_file_es_indexer.py
+++ b/parquet_flask/cdms_lambda_func/index_to_es/parquet_file_es_indexer.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import os
+import json
 
 from parquet_flask.parquet_stat_extractor.local_statistics_retriever import LocalStatisticsRetriever
 from parquet_flask.utils.file_utils import FileUtils
@@ -80,13 +81,16 @@ class ParquetFileEsIndexer:
         #         'match_all': {}
         #     }
         # }))
+
+        LOGGER.debug(f'Triggering event:\n{json.dumps(event, indent=4)}')
+
         s3_records = S3ToSqs(event)
         ignoring_phrases = ['spark-staging', '_temporary']
         for i in range(s3_records.size()):
             self.__s3_url = s3_records.get_s3_url(i)
             if any([k in self.__s3_url for k in ignoring_phrases]):
                 LOGGER.debug(f'skipping temp file: {self.__s3_url}')
-                return
+                continue
             LOGGER.debug(f'executing: {self.__s3_url}')
             s3_event = s3_records.get_event_name(i).strip().lower()
             if s3_event.startswith('objectcreated'):
@@ -96,5 +100,5 @@ class ParquetFileEsIndexer:
                 LOGGER.debug('executing to remove index')
                 self.remove_file()
             else:
-                raise ValueError(f'invalid s3_event: {s3_event}')
+                LOGGER.error(f'invalid s3_event: {s3_event}; skipping it')
         return
diff --git a/parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py b/parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py
index 09c9c1d..e59c09d 100644
--- a/parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py
+++ b/parquet_flask/cdms_lambda_func/s3_records/s3_2_sqs.py
@@ -146,14 +146,21 @@ class S3ToSqs(S3EventValidatorAbstract):
         if is_valid is False:
             raise ValueError(f'invalid OUTER_SCHEMA: {self.__event} vs {self.OUTER_SCHEMA}. errors: {validation_err}')
         self.__s3_record = []
-        for each_s3_record in self.__event['Records']:
-            s3_record = each_s3_record['body']
+
+        # Unpack SQS messages from lambda event
+
+        for each_sqs_record in self.__event['Records']:
+            s3_record = each_sqs_record['body']
             if isinstance(s3_record, str):
                 s3_record = json.loads(s3_record)
             is_valid, validation_err = GeneralUtils.is_json_valid(s3_record, self.S3_RECORD_SCHEMA)
             if is_valid is False:
                 raise ValueError(f'invalid S3_RECORD_SCHEMA: {s3_record} vs {self.S3_RECORD_SCHEMA}. errors: {validation_err}')
-            self.__s3_record.append(s3_record)
+
+            # Unpack S3 events from SQS message. (I believe multiple events can be consolidated into single messages)
+
+            for each_s3_record in s3_record['Records']:
+                self.__s3_record.append(each_s3_record)
         return self
 
     def size(self):
@@ -166,7 +173,7 @@ class S3ToSqs(S3EventValidatorAbstract):
             self.__is_valid()
         if index >= len(self.__s3_record):
             raise ValueError(f'index: {index} is larger than s3_record array size: {len(self.__s3_record)}')
-        s3_url = f"s3://{self.__s3_record[index]['Records'][0]['s3']['bucket']['name']}/{self.__s3_record[index]['Records'][0]['s3']['object']['key']}"
+        s3_url = f"s3://{self.__s3_record[index]['s3']['bucket']['name']}/{self.__s3_record[index]['s3']['object']['key']}"
         LOGGER.debug(f'original s3_url: {s3_url}')
         s3_url = unquote_plus(s3_url)
         LOGGER.debug(f'unquoted s3_url: {s3_url}')
@@ -177,4 +184,4 @@ class S3ToSqs(S3EventValidatorAbstract):
             self.__is_valid()
         if index >= len(self.__s3_record):
             raise ValueError(f'index: {index} is larger than s3_record array size: {len(self.__s3_record)}')
-        return self.__s3_record[index]['Records'][0]['eventName']
+        return self.__s3_record[index]['eventName']
diff --git a/parquet_flask/v1/query_data_doms.py b/parquet_flask/v1/query_data_doms.py
index 2c6c4a4..3d1d8e5 100644
--- a/parquet_flask/v1/query_data_doms.py
+++ b/parquet_flask/v1/query_data_doms.py
@@ -96,10 +96,10 @@ class IngestParquet(Resource):
             result_set = query.search()
             LOGGER.debug(f'search params: {payload}b')
             page_info = self.__calculate_4_ranges(result_set['total'])
-            result_set['last'] = f'{request.base_url}?{self.__replace_start_from(page_info["last"])}'
-            result_set['first'] = f'{request.base_url}?{self.__replace_start_from(page_info["first"])}'
-            result_set['next'] = f'{request.base_url}?{self.__replace_start_from(page_info["next"])}'
-            result_set['prev'] = f'{request.base_url}?{self.__replace_start_from(page_info["prev"])}'
+            result_set['last'] = f'{request.base_url}?{self.__replace_start_from(page_info["last"])}'.replace('http://', 'https://')
+            result_set['first'] = f'{request.base_url}?{self.__replace_start_from(page_info["first"])}'.replace('http://', 'https://')
+            result_set['next'] = f'{request.base_url}?{self.__replace_start_from(page_info["next"])}'.replace('http://', 'https://')
+            result_set['prev'] = f'{request.base_url}?{self.__replace_start_from(page_info["prev"])}'.replace('http://', 'https://')
             return result_set, 200
         except Exception as e:
             LOGGER.exception(f'failed to query parquet. cause: {str(e)}')
diff --git a/parquet_flask/v1/query_data_doms_custom_pagination.py b/parquet_flask/v1/query_data_doms_custom_pagination.py
index 856595b..b12e922 100644
--- a/parquet_flask/v1/query_data_doms_custom_pagination.py
+++ b/parquet_flask/v1/query_data_doms_custom_pagination.py
@@ -73,12 +73,12 @@ class IngestParquet(Resource):
         if 'markerPlatform' in new_args:
             new_args.pop('markerPlatform')
         new_args = '&'.join([f'{k}={v}' for k, v in new_args.items()])
-        return f'{request.base_url}?{new_args}'
+        return f'{request.base_url}?{new_args}'.replace('http://', 'https://')
 
     def __get_prev_page_url(self):
         new_args = deepcopy(dict(request.args))
         new_args = '&'.join([f'{k}={v}' for k, v in new_args.items()])
-        return f'{request.base_url}?{new_args}'
+        return f'{request.base_url}?{new_args}'.replace('http://', 'https://')
 
     def __get_next_page_url(self, query_result: list):
         if len(query_result) < 1:
@@ -88,7 +88,7 @@ class IngestParquet(Resource):
         new_args['markerTime'] = last_item[CDMSConstants.time_col]
         new_args['markerPlatform'] = GeneralUtils.gen_sha_256_json_obj(last_item)
         new_args = '&'.join([f'{k}={v}' for k, v in new_args.items()])
-        return f'{request.base_url}?{new_args}'
+        return f'{request.base_url}?{new_args}'.replace('http://', 'https://')
 
     def __execute_query(self, payload):
         """
diff --git a/setup_lambda.py b/setup_lambda.py
index 88ae263..f06b623 100644
--- a/setup_lambda.py
+++ b/setup_lambda.py
@@ -18,12 +18,13 @@ from setuptools import find_packages, setup
 
 install_requires = [
     # 'fastparquet===0.5.0',  # not using it. sticking to pyspark with spark cluster according to Nga
-    'jsonschema',  # to verify json objects
+    'jsonschema==4.16.0',  # to verify json objects
     'fastjsonschema===2.15.1',
     'requests===2.26.0',
     'boto3', 'botocore',
     'requests_aws4auth===1.1.1',  # to send aws signed headers in requests
     'elasticsearch===7.13.4',
+    'typing_extensions==4.3.0'
 ]
 
 setup(