You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:15:35 UTC

[incubator-sdap-ingester] branch async-history created (now a5d9a8f)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


      at a5d9a8f  async solr history

This branch includes the following new commits:

     new a5d9a8f  async solr history

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-ingester] 01/01: async solr history

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a5d9a8f1c55fd359e82d26caac825cdd2aa256ae
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700

    async solr history
---
 .../services/CollectionProcessor.py                |  8 +++----
 .../services/CollectionWatcher.py                  |  1 -
 .../history_manager/FileIngestionHistory.py        |  3 ++-
 .../services/history_manager/IngestionHistory.py   | 26 +++++++++++-----------
 .../history_manager/SolrIngestionHistory.py        | 11 +++++----
 collection_manager/docker/Dockerfile               | 12 +++++-----
 config_operator/config_operator/main.py            |  1 +
 .../granule_ingester/writers/SolrStore.py          |  1 +
 8 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
                         f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
-                        f"time range for collection '{collection.dataset_id}'. Skipping.")
+            logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                         f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        history_manager.push(granule)
+        await history_manager.push(granule)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..0d5eabd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
-from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
         Constructor
         :param history_path:
         :param dataset_id:
-        :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+        :param signature_fun: function which creates the signature of the cache,
+                              a file path string as argument and returns a string (md5sum, time stamp)
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    def push(self, file_path: str):
+    async def push(self, file_path: str):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        self._push_record(file_name, signature)
+        await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
             self._latest_ingested_file_update = os.path.getmtime(file_path)
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
-        self._save_latest_timestamp()
+        await self._save_latest_timestamp()
 
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    def already_ingested(self, file_path: str) -> bool:
+    async def already_ingested(self, file_path: str) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        return signature == self._get_signature(file_name)
+        return signature == await self._get_signature(file_name)
 
-    def get_granule_status(self,
-                           file_path: str,
-                           date_from: datetime = None,
-                           date_to: datetime = None) -> GranuleStatus:
+    async def get_granule_status(self,
+                                 file_path: str,
+                                 date_from: datetime = None,
+                                 date_to: datetime = None) -> GranuleStatus:
         """
         Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
         and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
         """
         if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         pass
 
     @abstractmethod
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         pass
 
     @abstractmethod
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         pass
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
 import pysolr
 import requests
 
+from common.async_utils.AsyncUtils import run_in_executor
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
 from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
+logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
     def __del__(self):
         self._req_session.close()
 
+    @run_in_executor
     def _push_record(self, file_name, signature):
         hash_id = doc_key(self._dataset_id, file_name)
         self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
+    @run_in_executor
     def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
         else:
             return None
 
+    @run_in_executor
     def _get_signature(self, file_name):
         hash_id = doc_key(self._dataset_id, file_name)
         results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "granule_s", "string")
                 self._add_field(schema_endpoint, "granule_signature_s", "string")
 
-            else:
-                logger.info(f"collection {self._granule_collection_name} already exists")
-
             if self._dataset_collection_name not in existing_collections:
                 # Create collection
                 payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
 
-            else:
-                logger.info(f"collection {self._dataset_collection_name} already exists")
-
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
 RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
 RUN apt-get update && apt-get install -y kubectl
 
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
+RUN cd /common && python setup.py install
 RUN cd /collection_manager && python setup.py install
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
 from config_operator.config_source import RemoteGitConfig, LocalDirConfig
 from config_operator.k8s import K8sConfigMap
 
+
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
+from common import AsyncUtils
 
 import pysolr
 from kazoo.handlers.threading import KazooTimeoutError