You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/29 22:34:12 UTC
[incubator-sdap-ingester] branch s3-support updated: fix
signature_fun for s3
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/s3-support by this push:
new 200046c fix signature_fun for s3
200046c is described below
commit 200046cf908bb6f3e5ae61eeb900199d085d5b03
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 15:33:56 2020 -0700
fix signature_fun for s3
---
collection_manager/collection_manager/main.py | 21 +++++++++++----
.../history_manager/FileIngestionHistory.py | 3 +--
.../services/history_manager/IngestionHistory.py | 30 +++++++++++++---------
.../history_manager/SolrIngestionHistory.py | 7 ++---
4 files changed, 37 insertions(+), 24 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 044cb87..3dba6e0 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -3,8 +3,11 @@ import asyncio
import logging
import os
-from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
-from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
+from collection_manager.services import (CollectionProcessor,
+ CollectionWatcher, MessagePublisher)
+from collection_manager.services.history_manager import (
+ FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
+ md5sum_from_filepath)
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.WARNING)
@@ -58,11 +61,19 @@ def get_args() -> argparse.Namespace:
async def main():
try:
options = get_args()
+ ENABLE_S3 = False
+
+ if ENABLE_S3:
+ signature_fun = None
+ else:
+ signature_fun = md5sum_from_filepath
if options.history_path:
- history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
+ history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path,
+ signature_fun=signature_fun)
else:
- history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
+ history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url,
+ signature_fun=signature_fun)
async with MessagePublisher(host=options.rabbitmq_host,
username=options.rabbitmq_username,
password=options.rabbitmq_password,
@@ -72,7 +83,7 @@ async def main():
collection_watcher = CollectionWatcher(collections_path=options.collections_path,
granule_updated_callback=collection_processor.process_granule,
collections_refresh_interval=int(options.refresh),
- s3=True)
+ s3=ENABLE_S3)
await collection_watcher.start_watching()
while True:
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index ffa065f..cf92997 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -4,7 +4,6 @@ from pathlib import Path
from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
logger = logging.getLogger(__name__)
@@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory):
"""
self._dataset_id = dataset_id
self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
- self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
+ self._signature_fun = signature_fun
self._history_dict = {}
self._load_history_dict()
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ea50ffb..d901690 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -38,28 +38,29 @@ class GranuleStatus(Enum):
class IngestionHistory(ABC):
_signature_fun = None
- _latest_ingested_file_update = None
+ _latest_ingested_file_update: float = None
- async def push(self, file_path: str, modified_time: datetime):
+ async def push(self, file_path: str, modified_datetime: datetime):
"""
Record a file as having been ingested.
:param file_path: The full path to the file to record.
:return: None
"""
+ modified_timestamp = int(modified_datetime.timestamp())
file_name = IngestionHistory._get_standardized_path(file_path)
- signature = self._signature_fun(file_path)
+ signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp)
await self._push_record(file_name, signature)
if not self._latest_ingested_file_update:
- self._latest_ingested_file_update = modified_time
+ self._latest_ingested_file_update = modified_timestamp
else:
- self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time)
+ self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_timestamp)
await self._save_latest_timestamp()
async def get_granule_status(self,
file_path: str,
- modified_time: datetime,
+ modified_datetime: datetime,
date_from: datetime = None,
date_to: datetime = None) -> GranuleStatus:
"""
@@ -76,9 +77,11 @@ class IngestionHistory(ABC):
should fall in order to be "desired".
:return: A GranuleStatus enum.
"""
- if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()):
+ signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp())
+
+ if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path):
+ elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path, signature):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
@@ -102,14 +105,13 @@ class IngestionHistory(ABC):
else:
return None
- async def _already_ingested(self, file_path: str) -> bool:
+ async def _already_ingested(self, file_path: str, signature) -> bool:
"""
Return a boolean indicating whether the specified file has already been ingested, based on its signature.
:param file_path: The full path of a file to search for in the history.
:return: A boolean indicating whether this file has already been ingested or not
"""
file_name = IngestionHistory._get_standardized_path(file_path)
- signature = self._signature_fun(file_path)
return signature == await self._get_signature(file_name)
@abstractmethod
@@ -132,7 +134,11 @@ class IngestionHistory(ABC):
:param date_to: timestamp, can be None
:return: True is the update time of the file is between ts_from and ts_to. False otherwise
"""
- is_after_from = start_date.timestamp() < date if start_date else True
- is_before_to = end_date.timestamp() > date if end_date else True
+ is_after_from = start_date.timestamp() < date.timestamp() if start_date else True
+ is_before_to = end_date.timestamp() > date.timestamp() if end_date else True
return is_after_from and is_before_to
+
+ @staticmethod
+ def _signature_from_timestamp(timestamp: float):
+ return str(int(timestamp))
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 59f5cd7..ebed073 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -3,11 +3,8 @@ import logging
import pysolr
import requests
-
+from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder)
from common.async_utils.AsyncUtils import run_in_executor
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
logging.getLogger("pysolr").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}")
self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}")
self._dataset_id = dataset_id
- self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
+ self._signature_fun = signature_fun
self._latest_ingested_file_update = self._get_latest_file_update()
except requests.exceptions.RequestException:
raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}")