You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/02/05 21:36:29 UTC
(incubator-sdap-in-situ-data-services) 01/02: Handle ingesting previously ingested files
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git
commit 3ed32ba0579b09076176648dbb575afc2483da0f
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Mon Feb 5 13:34:07 2024 -0800
Handle ingesting previously ingested files
- Check if the file has been ingested before
- If so, query the stats index for all the parquet objects in S3 from the previous ingest job
- After successful ingest, delete the old objects
- Stats indexing lambda will clean up the stats index entries
---
parquet_flask/aws/aws_s3.py | 19 +++++++++
parquet_flask/v1/ingest_aws_json.py | 78 ++++++++++++++++++++++++++++++++-----
2 files changed, 87 insertions(+), 10 deletions(-)
diff --git a/parquet_flask/aws/aws_s3.py b/parquet_flask/aws/aws_s3.py
index e9bcd49..840625c 100644
--- a/parquet_flask/aws/aws_s3.py
+++ b/parquet_flask/aws/aws_s3.py
@@ -140,3 +140,22 @@ class AwsS3(AwsCred):
self.__s3_client.download_file(self.__target_bucket, self.__target_key, local_file_path)
LOGGER.debug(f'file downloaded')
return local_file_path
+
+ def delete_keys(self, bucket, keys):
+ batches = [keys[i:i + 1000] for i in range(0, len(keys), 1000)]
+
+ for batch in batches:
+ LOGGER.debug(f'Deleting {len(batch):,} objects')
+ resp = self.__s3_client.delete_objects(Bucket=bucket, Delete=dict(Objects=batch))
+
+ if len(resp['Deleted']) != len(batch):
+ LOGGER.error(f'{len(resp["Errors"]):,} objects could not be deleted')
+
+ retries = 3
+
+ while len(resp["Errors"]) > 0 and retries > 0:
+ LOGGER.debug(f'Retrying {len(resp["Errors"])} objects')
+ resp = self.__s3_client.delete_objects(
+ Bucket=bucket,
+ Delete=dict(Objects=[dict(Key=e['Key']) for e in resp['Errors']])
+ )
diff --git a/parquet_flask/v1/ingest_aws_json.py b/parquet_flask/v1/ingest_aws_json.py
index ae331c3..833220f 100644
--- a/parquet_flask/v1/ingest_aws_json.py
+++ b/parquet_flask/v1/ingest_aws_json.py
@@ -39,7 +39,7 @@ class IngestAwsJsonProps:
self.__s3_sha_url = None
self.__uuid = str(uuid.uuid4())
self.__working_dir = f'/tmp/{str(uuid.uuid4())}'
- self.__is_replacing = False
+ self.__is_replacing = True
self.__is_sanitizing = True
self.__wait_till_complete = True
@@ -146,6 +146,8 @@ class IngestAwsJson:
config = Config()
es_url = config.get_value(Config.es_url)
es_port = int(config.get_value(Config.es_port, '443'))
+ self.__es_url = es_url
+ self.__es_port = es_port
self.__es: ESAbstract = ESFactory().get_instance('AWS', index='', base_url=es_url, port=es_port)
self.__db_io: MetadataTblInterface = MetadataTblES(self.__es)
@@ -219,9 +221,9 @@ class IngestAwsJson:
FileUtils.del_file(self.__saved_file_name)
return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500
if self.__sha512_result is True:
- return {'message': 'ingested', 'job_id': self.__props.uuid}, 201
+ return {'message': 'ingested', 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 201
return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause,
- 'job_id': self.__props.uuid}, 203
+ 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 203
def ingest(self):
"""
@@ -237,16 +239,49 @@ class IngestAwsJson:
try:
LOGGER.debug(f'starting to ingest: {self.__props.s3_url}')
existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url)
- if existing_record is None and self.__props.is_replacing is True:
- LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
- return {'message': 'unable to replace file as it is new'}, 500
-
if existing_record is not None and self.__props.is_replacing is False:
- LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}')
+ LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. '
+ f'ingested record: {existing_record}')
return {'message': 'unable to ingest file as it is already ingested'}, 500
+ if existing_record is None and self.__props.is_replacing is True:
+ # LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}')
+ # return {'message': 'unable to replace file as it is new'}, 500
+ LOGGER.info(f'File {self.__props.s3_url} is new and does not need to be replaced')
+ self.__props.is_replacing = False
+
s3 = AwsS3().set_s3_url(self.__props.s3_url)
LOGGER.debug(f'downloading s3 file: {self.__props.uuid}')
+ LOGGER.debug(f'Existing record: {existing_record}')
+
+ if existing_record is not None:
+ ingest_uuid_to_replace = existing_record['uuid']
+
+ stats_es: ESAbstract = ESFactory().get_instance('AWS', index='parquet_stats_alias',
+ base_url=self.__es_url, port=self.__es_port)
+
+ parquet_stats = stats_es.query_with_scroll(
+ {
+ "query": {
+ "wildcard": {
+ "s3_url": {
+ "value": f"*/job_id={ingest_uuid_to_replace}/*"
+ }
+ }
+ }
+ },
+ querying_index='parquet_stats_alias'
+ )
+
+ parquet_files = [s3.split_s3_url(doc['_id']) for doc in parquet_stats['hits']['hits']]
+
+ if len(parquet_files) != 0:
+ LOGGER.warning(f'Could find no S3 objects to delete for ingest job [{ingest_uuid_to_replace}] that '
+ f'needs to be replaced!')
+
+ else:
+ parquet_files = []
+
FileUtils.mk_dir_p(self.__props.working_dir)
self.__saved_file_name = s3.download(self.__props.working_dir)
self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name)
@@ -255,9 +290,32 @@ class IngestAwsJson:
self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name)
self.__compare_sha512(self.__get_s3_sha512())
if self.__props.wait_till_complete is True:
- return self.__execute_ingest_data()
+ ret = self.__execute_ingest_data()
+
+ if existing_record is None or len(parquet_files) == 0:
+ return ret
+ else:
+ LOGGER.info(f'Deleting {len(parquet_files)} parquet objects from existing ingest')
+ s3.delete_keys(
+ parquet_files[0][0],
+ [dict(Key=p[1]) for p in parquet_files]
+ )
+ return ret
else:
- bg_process = Process(target=self.__execute_ingest_data, args=())
+ if existing_record is None or len(parquet_files) == 0:
+ bg_target = self.__execute_ingest_data
+ args = ()
+ else:
+ def ingest_then_delete(p_s3, p_parquet_files):
+ self.__execute_ingest_data()
+ p_s3.delete_keys(
+ p_parquet_files[0][0],
+ [dict(Key=p[1]) for p in p_parquet_files]
+ )
+ bg_target = ingest_then_delete
+ args = (s3, parquet_files)
+
+ bg_process = Process(target=bg_target, args=args)
bg_process.daemon = True
bg_process.start()
return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204