You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/02/05 21:36:29 UTC

(incubator-sdap-in-situ-data-services) 01/02: Handle ingesting previously ingested files

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git

commit 3ed32ba0579b09076176648dbb575afc2483da0f
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Mon Feb 5 13:34:07 2024 -0800

    Handle ingesting previously ingested files
    
    - Check if the file has been ingested before
    - If so, query the stats index for all the parquet objects in S3 from the previous ingest job
    - After successful ingest, delete the old objects
    - Stats indexing lambda will clean up the stats index entries
---
 parquet_flask/aws/aws_s3.py         | 19 +++++++++
 parquet_flask/v1/ingest_aws_json.py | 78 ++++++++++++++++++++++++++++++++-----
 2 files changed, 87 insertions(+), 10 deletions(-)

diff --git a/parquet_flask/aws/aws_s3.py b/parquet_flask/aws/aws_s3.py
index e9bcd49..840625c 100644
--- a/parquet_flask/aws/aws_s3.py
+++ b/parquet_flask/aws/aws_s3.py
@@ -140,3 +140,22 @@ class AwsS3(AwsCred):
         self.__s3_client.download_file(self.__target_bucket, self.__target_key, local_file_path)
         LOGGER.debug(f'file downloaded')
         return local_file_path
+
+    def delete_keys(self, bucket, keys):
+        batches = [keys[i:i + 1000] for i in range(0, len(keys), 1000)]
+
+        for batch in batches:
+            LOGGER.debug(f'Deleting {len(batch):,} objects')
+            resp = self.__s3_client.delete_objects(Bucket=bucket, Delete=dict(Objects=batch))
+
+            if len(resp['Deleted']) != len(batch):
+                LOGGER.error(f'{len(resp["Errors"]):,} objects could not be deleted')
+
+                retries = 3
+
+                while len(resp["Errors"]) > 0 and retries > 0:
+                    LOGGER.debug(f'Retrying {len(resp["Errors"])} objects')
+                    resp = self.__s3_client.delete_objects(
+                        Bucket=bucket,
+                        Delete=dict(Objects=[dict(Key=e['Key']) for e in resp['Errors']])
+                    )
diff --git a/parquet_flask/v1/ingest_aws_json.py b/parquet_flask/v1/ingest_aws_json.py
index ae331c3..833220f 100644
--- a/parquet_flask/v1/ingest_aws_json.py
+++ b/parquet_flask/v1/ingest_aws_json.py
@@ -39,7 +39,7 @@ class IngestAwsJsonProps:
         self.__s3_sha_url = None
         self.__uuid = str(uuid.uuid4())
         self.__working_dir = f'/tmp/{str(uuid.uuid4())}'
-        self.__is_replacing = False
+        self.__is_replacing = True
         self.__is_sanitizing = True
         self.__wait_till_complete = True
 
@@ -146,6 +146,8 @@ class IngestAwsJson:
         config = Config()
         es_url = config.get_value(Config.es_url)
         es_port = int(config.get_value(Config.es_port, '443'))
+        self.__es_url = es_url
+        self.__es_port = es_port
         self.__es: ESAbstract = ESFactory().get_instance('AWS', index='', base_url=es_url, port=es_port)
         self.__db_io: MetadataTblInterface = MetadataTblES(self.__es)
 
@@ -219,9 +221,9 @@ class IngestAwsJson:
             FileUtils.del_file(self.__saved_file_name)
             return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500
         if self.__sha512_result is True:
-            return {'message': 'ingested', 'job_id': self.__props.uuid}, 201
+            return {'message': 'ingested', 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 201
         return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause,
-                'job_id': self.__props.uuid}, 203
+                'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 203
 
     def ingest(self):
         """
@@ -237,16 +239,49 @@ class IngestAwsJson:
         try:
             LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
             existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
-            if existing_record is None and self.__props.is_replacing is True:
-                LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
-                return {'message': 'unable to replace file as it is new'}, 500
-
             if existing_record is not None and self.__props.is_replacing is False:
-                LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
+                LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. '
+                             f'ingested record: {existing_record}')
                 return {'message': 'unable to ingest file as it is already ingested'}, 500
 
+            if existing_record is None and self.__props.is_replacing is True:
+                # LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
+                # return {'message': 'unable to replace file as it is new'}, 500
+                LOGGER.info(f'File {self.__props.s3_url} is new and does not need to be replaced')
+                self.__props.is_replacing = False
+
             s3 = AwsS3().set_s3_url(self.__props.s3_url)
             LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
+            LOGGER.debug(f'Existing record: {existing_record}')
+
+            if existing_record is not None:
+                ingest_uuid_to_replace = existing_record['uuid']
+
+                stats_es: ESAbstract = ESFactory().get_instance('AWS', index='parquet_stats_alias',
+                                                                base_url=self.__es_url, port=self.__es_port)
+
+                parquet_stats = stats_es.query_with_scroll(
+                    {
+                        "query": {
+                            "wildcard": {
+                                "s3_url": {
+                                    "value": f"*/job_id={ingest_uuid_to_replace}/*"
+                                }
+                            }
+                        }
+                    },
+                    querying_index='parquet_stats_alias'
+                )
+
+                parquet_files = [s3.split_s3_url(doc['_id']) for doc in parquet_stats['hits']['hits']]
+
+                if len(parquet_files) != 0:
+                    LOGGER.warning(f'Could find no S3 objects to delete for ingest job [{ingest_uuid_to_replace}] that '
+                                   f'needs to be replaced!')
+
+            else:
+                parquet_files = []
+
             FileUtils.mk_dir_p(self.__props.working_dir)
             self.__saved_file_name = s3.download(self.__props.working_dir)
             self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
@@ -255,9 +290,32 @@ class IngestAwsJson:
                 self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
             self.__compare_sha512(self.__get_s3_sha512())
             if self.__props.wait_till_complete is True:
-                return self.__execute_ingest_data()
+                ret = self.__execute_ingest_data()
+
+                if existing_record is None or len(parquet_files) == 0:
+                    return ret
+                else:
+                    LOGGER.info(f'Deleting {len(parquet_files)} parquet objects from existing ingest')
+                    s3.delete_keys(
+                        parquet_files[0][0],
+                        [dict(Key=p[1]) for p in parquet_files]
+                    )
+                    return ret
             else:
-                bg_process = Process(target=self.__execute_ingest_data, args=())
+                if existing_record is None or len(parquet_files) == 0:
+                    bg_target = self.__execute_ingest_data
+                    args = ()
+                else:
+                    def ingest_then_delete(p_s3, p_parquet_files):
+                        self.__execute_ingest_data()
+                        p_s3.delete_keys(
+                            p_parquet_files[0][0],
+                            [dict(Key=p[1]) for p in p_parquet_files]
+                        )
+                    bg_target = ingest_then_delete
+                    args = (s3, parquet_files)
+
+                bg_process = Process(target=bg_target, args=args)
                 bg_process.daemon = True
                 bg_process.start()
                 return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204