You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:15:35 UTC
[incubator-sdap-ingester] branch async-history created (now a5d9a8f)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a change to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.
at a5d9a8f async solr history
This branch includes the following new commits:
new a5d9a8f async solr history
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[incubator-sdap-ingester] 01/01: async solr history
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a5d9a8f1c55fd359e82d26caac825cdd2aa256ae
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700
async solr history
---
.../services/CollectionProcessor.py | 8 +++----
.../services/CollectionWatcher.py | 1 -
.../history_manager/FileIngestionHistory.py | 3 ++-
.../services/history_manager/IngestionHistory.py | 26 +++++++++++-----------
.../history_manager/SolrIngestionHistory.py | 11 +++++----
collection_manager/docker/Dockerfile | 12 +++++-----
config_operator/config_operator/main.py | 1 +
.../granule_ingester/writers/SolrStore.py | 1 +
8 files changed, 33 insertions(+), 30 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
return
history_manager = self._get_history_manager(collection.dataset_id)
- granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+ granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
f"'{collection.dataset_id}'.")
use_priority = collection.historical_priority
else:
- logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
- f"time range for collection '{collection.dataset_id}'. Skipping.")
+ logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+ f"time range for collection '{collection.dataset_id}'. Skipping.")
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
await self._publisher.publish_message(body=dataset_config, priority=use_priority)
- history_manager.push(granule)
+ await history_manager.push(granule)
@staticmethod
def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..0d5eabd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
-from yaml.scanner import ScannerError
from collection_manager.entities import Collection
from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
Constructor
:param history_path:
:param dataset_id:
- :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+ :param signature_fun: function which creates the signature of the cache,
+ a file path string as argument and returns a string (md5sum, time stamp)
"""
self._dataset_id = dataset_id
self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
_signature_fun = None
_latest_ingested_file_update = None
- def push(self, file_path: str):
+ async def push(self, file_path: str):
"""
Record a file as having been ingested.
:param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- self._push_record(file_name, signature)
+ await self._push_record(file_name, signature)
if not self._latest_ingested_file_update:
self._latest_ingested_file_update = os.path.getmtime(file_path)
else:
self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
- self._save_latest_timestamp()
+ await self._save_latest_timestamp()
def latest_ingested_mtime(self) -> Optional[datetime]:
"""
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
else:
return None
- def already_ingested(self, file_path: str) -> bool:
+ async def already_ingested(self, file_path: str) -> bool:
"""
Return a boolean indicating whether the specified file has already been ingested, based on its signature.
:param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- return signature == self._get_signature(file_name)
+ return signature == await self._get_signature(file_name)
- def get_granule_status(self,
- file_path: str,
- date_from: datetime = None,
- date_to: datetime = None) -> GranuleStatus:
+ async def get_granule_status(self,
+ file_path: str,
+ date_from: datetime = None,
+ date_to: datetime = None) -> GranuleStatus:
"""
Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
"""
if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+ elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
@abstractmethod
- def _save_latest_timestamp(self):
+ async def _save_latest_timestamp(self):
pass
@abstractmethod
- def _push_record(self, file_name, signature):
+ async def _push_record(self, file_name, signature):
pass
@abstractmethod
- def _get_signature(self, file_name):
+ async def _get_signature(self, file_name):
pass
@staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
import pysolr
import requests
+from common.async_utils.AsyncUtils import run_in_executor
from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
+logging.getLogger("pysolr").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
def __del__(self):
self._req_session.close()
+ @run_in_executor
def _push_record(self, file_name, signature):
hash_id = doc_key(self._dataset_id, file_name)
self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_granules.commit()
return None
+ @run_in_executor
def _save_latest_timestamp(self):
if self._solr_datasets:
self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
else:
return None
+ @run_in_executor
def _get_signature(self, file_name):
hash_id = doc_key(self._dataset_id, file_name)
results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "granule_s", "string")
self._add_field(schema_endpoint, "granule_signature_s", "string")
- else:
- logger.info(f"collection {self._granule_collection_name} already exists")
-
if self._dataset_collection_name not in existing_collections:
# Create collection
payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
- else:
- logger.info(f"collection {self._dataset_collection_name} already exists")
-
except requests.exceptions.RequestException as e:
logger.error(f"solr instance unreachable {self._solr_url}")
raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
RUN apt-get update && apt-get install -y kubectl
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
+RUN cd /common && python setup.py install
RUN cd /collection_manager && python setup.py install
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
from config_operator.config_source import RemoteGitConfig, LocalDirConfig
from config_operator.k8s import K8sConfigMap
+
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
from typing import Dict
+from common import AsyncUtils
import pysolr
from kazoo.handlers.threading import KazooTimeoutError