You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/02/05 21:36:28 UTC

(incubator-sdap-in-situ-data-services) branch master updated (a27b9ac -> 60cc8b7)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git


    from a27b9ac  Merge pull request #25 from RKuttruff/spark-bump
     new 3ed32ba  Handle ingesting previously ingested files
     new 60cc8b7  Changelog

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.md                        |  1 +
 parquet_flask/aws/aws_s3.py         | 19 +++++++++
 parquet_flask/v1/ingest_aws_json.py | 78 ++++++++++++++++++++++++++++++++-----
 3 files changed, 88 insertions(+), 10 deletions(-)


(incubator-sdap-in-situ-data-services) 02/02: Changelog

Posted by rk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git

commit 60cc8b758c6bc3ac0fb5bfdbde551236daacfe6c
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Mon Feb 5 13:36:18 2024 -0800

    Changelog
---
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fbd6d59..6aea315 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - SDAP-464: Updated AWS deployment guide
 ### Changed
 - Updated Elasticsearch *query_by_id* method to accept an *index* as argument
+- Change handling of ingesting previously ingested files to replace the previous job instead of failing the current job
 - SDAP-462: Updated query logic so that depth -99999 is treated as surface (i.e. depth 0)
 - SDAP-463: Added capability to further partition parquet objects/files by platform
 ### Changed


(incubator-sdap-in-situ-data-services) 01/02: Handle ingesting previously ingested files

Posted by rk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git

commit 3ed32ba0579b09076176648dbb575afc2483da0f
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Mon Feb 5 13:34:07 2024 -0800

    Handle ingesting previously ingested files
    
    - Check if the file has been ingested before
    - If so, query the stats index for all the parquet objects in S3 from the previous ingest job
    - After successful ingest, delete the old objects
    - Stats indexing lambda will clean up the stats index entries
---
 parquet_flask/aws/aws_s3.py         | 19 +++++++++
 parquet_flask/v1/ingest_aws_json.py | 78 ++++++++++++++++++++++++++++++++-----
 2 files changed, 87 insertions(+), 10 deletions(-)

diff --git a/parquet_flask/aws/aws_s3.py b/parquet_flask/aws/aws_s3.py
index e9bcd49..840625c 100644
--- a/parquet_flask/aws/aws_s3.py
+++ b/parquet_flask/aws/aws_s3.py
@@ -140,3 +140,22 @@ class AwsS3(AwsCred):
         self.__s3_client.download_file(self.__target_bucket, self.__target_key, local_file_path)
         LOGGER.debug(f'file downloaded')
         return local_file_path
+
+    def delete_keys(self, bucket, keys):
+        batches = [keys[i:i + 1000] for i in range(0, len(keys), 1000)]
+
+        for batch in batches:
+            LOGGER.debug(f'Deleting {len(batch):,} objects')
+            resp = self.__s3_client.delete_objects(Bucket=bucket, Delete=dict(Objects=batch))
+
+            if len(resp['Deleted']) != len(batch):
+                LOGGER.error(f'{len(resp["Errors"]):,} objects could not be deleted')
+
+                retries = 3
+
+                while len(resp["Errors"]) > 0 and retries > 0:
+                    LOGGER.debug(f'Retrying {len(resp["Errors"])} objects')
+                    resp = self.__s3_client.delete_objects(
+                        Bucket=bucket,
+                        Delete=dict(Objects=[dict(Key=e['Key']) for e in resp['Errors']])
+                    )
diff --git a/parquet_flask/v1/ingest_aws_json.py b/parquet_flask/v1/ingest_aws_json.py
index ae331c3..833220f 100644
--- a/parquet_flask/v1/ingest_aws_json.py
+++ b/parquet_flask/v1/ingest_aws_json.py
@@ -39,7 +39,7 @@ class IngestAwsJsonProps:
         self.__s3_sha_url = None
         self.__uuid = str(uuid.uuid4())
         self.__working_dir = f'/tmp/{str(uuid.uuid4())}'
-        self.__is_replacing = False
+        self.__is_replacing = True
         self.__is_sanitizing = True
         self.__wait_till_complete = True
 
@@ -146,6 +146,8 @@ class IngestAwsJson:
         config = Config()
         es_url = config.get_value(Config.es_url)
         es_port = int(config.get_value(Config.es_port, '443'))
+        self.__es_url = es_url
+        self.__es_port = es_port
         self.__es: ESAbstract = ESFactory().get_instance('AWS', index='', base_url=es_url, port=es_port)
         self.__db_io: MetadataTblInterface = MetadataTblES(self.__es)
 
@@ -219,9 +221,9 @@ class IngestAwsJson:
             FileUtils.del_file(self.__saved_file_name)
             return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500
         if self.__sha512_result is True:
-            return {'message': 'ingested', 'job_id': self.__props.uuid}, 201
+            return {'message': 'ingested', 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 201
         return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause,
-                'job_id': self.__props.uuid}, 203
+                'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 203
 
     def ingest(self):
         """
@@ -237,16 +239,49 @@ class IngestAwsJson:
         try:
             LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
             existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
-            if existing_record is None and self.__props.is_replacing is True:
-                LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
-                return {'message': 'unable to replace file as it is new'}, 500
-
             if existing_record is not None and self.__props.is_replacing is False:
-                LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
+                LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. '
+                             f'ingested record: {existing_record}')
                 return {'message': 'unable to ingest file as it is already ingested'}, 500
 
+            if existing_record is None and self.__props.is_replacing is True:
+                # LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
+                # return {'message': 'unable to replace file as it is new'}, 500
+                LOGGER.info(f'File {self.__props.s3_url} is new and does not need to be replaced')
+                self.__props.is_replacing = False
+
             s3 = AwsS3().set_s3_url(self.__props.s3_url)
             LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
+            LOGGER.debug(f'Existing record: {existing_record}')
+
+            if existing_record is not None:
+                ingest_uuid_to_replace = existing_record['uuid']
+
+                stats_es: ESAbstract = ESFactory().get_instance('AWS', index='parquet_stats_alias',
+                                                                base_url=self.__es_url, port=self.__es_port)
+
+                parquet_stats = stats_es.query_with_scroll(
+                    {
+                        "query": {
+                            "wildcard": {
+                                "s3_url": {
+                                    "value": f"*/job_id={ingest_uuid_to_replace}/*"
+                                }
+                            }
+                        }
+                    },
+                    querying_index='parquet_stats_alias'
+                )
+
+                parquet_files = [s3.split_s3_url(doc['_id']) for doc in parquet_stats['hits']['hits']]
+
+                if len(parquet_files) != 0:
+                    LOGGER.warning(f'Could find no S3 objects to delete for ingest job [{ingest_uuid_to_replace}] that '
+                                   f'needs to be replaced!')
+
+            else:
+                parquet_files = []
+
             FileUtils.mk_dir_p(self.__props.working_dir)
             self.__saved_file_name = s3.download(self.__props.working_dir)
             self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
@@ -255,9 +290,32 @@ class IngestAwsJson:
                 self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
             self.__compare_sha512(self.__get_s3_sha512())
             if self.__props.wait_till_complete is True:
-                return self.__execute_ingest_data()
+                ret = self.__execute_ingest_data()
+
+                if existing_record is None or len(parquet_files) == 0:
+                    return ret
+                else:
+                    LOGGER.info(f'Deleting {len(parquet_files)} parquet objects from existing ingest')
+                    s3.delete_keys(
+                        parquet_files[0][0],
+                        [dict(Key=p[1]) for p in parquet_files]
+                    )
+                    return ret
             else:
-                bg_process = Process(target=self.__execute_ingest_data, args=())
+                if existing_record is None or len(parquet_files) == 0:
+                    bg_target = self.__execute_ingest_data
+                    args = ()
+                else:
+                    def ingest_then_delete(p_s3, p_parquet_files):
+                        self.__execute_ingest_data()
+                        p_s3.delete_keys(
+                            p_parquet_files[0][0],
+                            [dict(Key=p[1]) for p in p_parquet_files]
+                        )
+                    bg_target = ingest_then_delete
+                    args = (s3, parquet_files)
+
+                bg_process = Process(target=bg_target, args=args)
                 bg_process.daemon = True
                 bg_process.start()
                 return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204