You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:15:36 UTC
[incubator-sdap-ingester] 01/01: async solr history
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a5d9a8f1c55fd359e82d26caac825cdd2aa256ae
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700
async solr history
---
.../services/CollectionProcessor.py | 8 +++----
.../services/CollectionWatcher.py | 1 -
.../history_manager/FileIngestionHistory.py | 3 ++-
.../services/history_manager/IngestionHistory.py | 26 +++++++++++-----------
.../history_manager/SolrIngestionHistory.py | 11 +++++----
collection_manager/docker/Dockerfile | 12 +++++-----
config_operator/config_operator/main.py | 1 +
.../granule_ingester/writers/SolrStore.py | 1 +
8 files changed, 33 insertions(+), 30 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
return
history_manager = self._get_history_manager(collection.dataset_id)
- granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+ granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
f"'{collection.dataset_id}'.")
use_priority = collection.historical_priority
else:
- logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
- f"time range for collection '{collection.dataset_id}'. Skipping.")
+ logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+ f"time range for collection '{collection.dataset_id}'. Skipping.")
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
await self._publisher.publish_message(body=dataset_config, priority=use_priority)
- history_manager.push(granule)
+ await history_manager.push(granule)
@staticmethod
def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..0d5eabd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
-from yaml.scanner import ScannerError
from collection_manager.entities import Collection
from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
Constructor
:param history_path:
:param dataset_id:
- :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+ :param signature_fun: function which creates the signature of the cache,
+ a file path string as argument and returns a string (md5sum, time stamp)
"""
self._dataset_id = dataset_id
self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
_signature_fun = None
_latest_ingested_file_update = None
- def push(self, file_path: str):
+ async def push(self, file_path: str):
"""
Record a file as having been ingested.
:param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- self._push_record(file_name, signature)
+ await self._push_record(file_name, signature)
if not self._latest_ingested_file_update:
self._latest_ingested_file_update = os.path.getmtime(file_path)
else:
self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
- self._save_latest_timestamp()
+ await self._save_latest_timestamp()
def latest_ingested_mtime(self) -> Optional[datetime]:
"""
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
else:
return None
- def already_ingested(self, file_path: str) -> bool:
+ async def already_ingested(self, file_path: str) -> bool:
"""
Return a boolean indicating whether the specified file has already been ingested, based on its signature.
:param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- return signature == self._get_signature(file_name)
+ return signature == await self._get_signature(file_name)
- def get_granule_status(self,
- file_path: str,
- date_from: datetime = None,
- date_to: datetime = None) -> GranuleStatus:
+ async def get_granule_status(self,
+ file_path: str,
+ date_from: datetime = None,
+ date_to: datetime = None) -> GranuleStatus:
"""
Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
"""
if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+ elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
@abstractmethod
- def _save_latest_timestamp(self):
+ async def _save_latest_timestamp(self):
pass
@abstractmethod
- def _push_record(self, file_name, signature):
+ async def _push_record(self, file_name, signature):
pass
@abstractmethod
- def _get_signature(self, file_name):
+ async def _get_signature(self, file_name):
pass
@staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
import pysolr
import requests
+from common.async_utils.AsyncUtils import run_in_executor
from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
+logging.getLogger("pysolr").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
def __del__(self):
self._req_session.close()
+ @run_in_executor
def _push_record(self, file_name, signature):
hash_id = doc_key(self._dataset_id, file_name)
self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_granules.commit()
return None
+ @run_in_executor
def _save_latest_timestamp(self):
if self._solr_datasets:
self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
else:
return None
+ @run_in_executor
def _get_signature(self, file_name):
hash_id = doc_key(self._dataset_id, file_name)
results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "granule_s", "string")
self._add_field(schema_endpoint, "granule_signature_s", "string")
- else:
- logger.info(f"collection {self._granule_collection_name} already exists")
-
if self._dataset_collection_name not in existing_collections:
# Create collection
payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
- else:
- logger.info(f"collection {self._dataset_collection_name} already exists")
-
except requests.exceptions.RequestException as e:
logger.error(f"solr instance unreachable {self._solr_url}")
raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
RUN apt-get update && apt-get install -y kubectl
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
+RUN cd /common && python setup.py install
RUN cd /collection_manager && python setup.py install
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
from config_operator.config_source import RemoteGitConfig, LocalDirConfig
from config_operator.k8s import K8sConfigMap
+
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
from typing import Dict
+from common import AsyncUtils
import pysolr
from kazoo.handlers.threading import KazooTimeoutError