You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:15:36 UTC

[incubator-sdap-ingester] 01/01: async solr history

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a5d9a8f1c55fd359e82d26caac825cdd2aa256ae
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700

    async solr history
---
 .../services/CollectionProcessor.py                |  8 +++----
 .../services/CollectionWatcher.py                  |  1 -
 .../history_manager/FileIngestionHistory.py        |  3 ++-
 .../services/history_manager/IngestionHistory.py   | 26 +++++++++++-----------
 .../history_manager/SolrIngestionHistory.py        | 11 +++++----
 collection_manager/docker/Dockerfile               | 12 +++++-----
 config_operator/config_operator/main.py            |  1 +
 .../granule_ingester/writers/SolrStore.py          |  1 +
 8 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
                         f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
-                        f"time range for collection '{collection.dataset_id}'. Skipping.")
+            logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                         f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        history_manager.push(granule)
+        await history_manager.push(granule)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..0d5eabd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
-from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
         Constructor
         :param history_path:
         :param dataset_id:
-        :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+        :param signature_fun: function which creates the signature of the cache,
+                              a file path string as argument and returns a string (md5sum, time stamp)
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    def push(self, file_path: str):
+    async def push(self, file_path: str):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        self._push_record(file_name, signature)
+        await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
             self._latest_ingested_file_update = os.path.getmtime(file_path)
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
-        self._save_latest_timestamp()
+        await self._save_latest_timestamp()
 
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    def already_ingested(self, file_path: str) -> bool:
+    async def already_ingested(self, file_path: str) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        return signature == self._get_signature(file_name)
+        return signature == await self._get_signature(file_name)
 
-    def get_granule_status(self,
-                           file_path: str,
-                           date_from: datetime = None,
-                           date_to: datetime = None) -> GranuleStatus:
+    async def get_granule_status(self,
+                                 file_path: str,
+                                 date_from: datetime = None,
+                                 date_to: datetime = None) -> GranuleStatus:
         """
         Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
         and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
         """
         if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         pass
 
     @abstractmethod
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         pass
 
     @abstractmethod
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         pass
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
 import pysolr
 import requests
 
+from common.async_utils.AsyncUtils import run_in_executor
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
 from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
+logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
     def __del__(self):
         self._req_session.close()
 
+    @run_in_executor
     def _push_record(self, file_name, signature):
         hash_id = doc_key(self._dataset_id, file_name)
         self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
+    @run_in_executor
     def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
         else:
             return None
 
+    @run_in_executor
     def _get_signature(self, file_name):
         hash_id = doc_key(self._dataset_id, file_name)
         results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "granule_s", "string")
                 self._add_field(schema_endpoint, "granule_signature_s", "string")
 
-            else:
-                logger.info(f"collection {self._granule_collection_name} already exists")
-
             if self._dataset_collection_name not in existing_collections:
                 # Create collection
                 payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
 
-            else:
-                logger.info(f"collection {self._dataset_collection_name} already exists")
-
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
 RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
 RUN apt-get update && apt-get install -y kubectl
 
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
+RUN cd /common && python setup.py install
 RUN cd /collection_manager && python setup.py install
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
 from config_operator.config_source import RemoteGitConfig, LocalDirConfig
 from config_operator.k8s import K8sConfigMap
 
+
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
+from common import AsyncUtils
 
 import pysolr
 from kazoo.handlers.threading import KazooTimeoutError