You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 00:29:48 UTC

[incubator-sdap-ingester] branch async-history updated: wip

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/async-history by this push:
     new ed86c3d  wip
ed86c3d is described below

commit ed86c3d597d5dd5d788a4f16be5f1aa50d244a34
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Aug 10 17:29:32 2020 -0700

    wip
---
 .../services/CollectionWatcher.py                  |  2 --
 .../history_manager/FileIngestionHistory.py        |  6 +++---
 .../history_manager/test_FileIngestionHistory.py   | 24 ++++++++++++++--------
 .../tests/services/test_CollectionProcessor.py     |  2 +-
 .../granule_ingester/writers/SolrStore.py          | 18 +++++-----------
 5 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1fd1678..49acceb 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -111,8 +111,6 @@ class CollectionWatcher:
 
                 self._unschedule_watches()
                 self._schedule_watches()
-            else:
-                logger.info("No updated collections, so no files to scan")
         except CollectionConfigParsingError as e:
             logger.error(e)
 
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 140ae87..ffa065f 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -66,7 +66,7 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed")
 
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         if self._latest_ingested_file_update:
             with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
                 f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
@@ -90,10 +90,10 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"no history file {self._history_file_path} to purge")
 
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         self._history_dict[file_name] = signature
 
         self._history_file.write(f'{file_name},{signature}\n')
 
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         return self._history_dict.get(file_name, None)
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index d2ad45c..ff1fb3c 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -3,25 +3,30 @@ import pathlib
 import tempfile
 import unittest
 
-from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath
+from collection_manager.services.history_manager import (FileIngestionHistory,
+                                                         md5sum_from_filepath)
+
+from common.async_test_utils.AsyncTestUtils import async_test
 
 DATASET_ID = "zobi_la_mouche"
 
 
 class TestFileIngestionHistory(unittest.TestCase):
 
-    def test_get_md5sum(self):
+    @async_test
+    async def test_get_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("blue")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("blue")
             self.assertEqual(result, "12weeukrhbwerqu7wier")
 
-    def test_get_missing_md5sum(self):
+    @async_test
+    async def test_get_missing_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("green")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("green")
             self.assertIsNone(result)
 
     def test_already_ingested(self):
@@ -44,12 +49,13 @@ class TestFileIngestionHistory(unittest.TestCase):
 
             del ingestion_history
 
-    def test_already_ingested_is_false(self):
+    @async_test
+    async def test_already_ingested_is_false(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index aa143f3..fec4726 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -114,7 +114,7 @@ class TestCollectionProcessor(unittest.TestCase):
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
     async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
-                                                                             mock_history_builder, mock_history):
+                                                                                   mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 65f8b09..67532b5 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,28 +21,20 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
-from common import AsyncUtils
 
 import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
 from kazoo.exceptions import NoNodeError
-from nexusproto.DataTile_pb2 import TileSummary, NexusTile
+from kazoo.handlers.threading import KazooTimeoutError
 
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.exceptions import (SolrFailedHealthCheckError,
+                                         SolrLostConnectionError)
 from granule_ingester.writers.MetadataStore import MetadataStore
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
 
 logger = logging.getLogger(__name__)
 
 
-def run_in_executor(f):
-    @functools.wraps(f)
-    def inner(*args, **kwargs):
-        loop = asyncio.get_running_loop()
-        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
-    return inner
-
-
 class SolrStore(MetadataStore):
     def __init__(self, solr_url=None, zk_url=None):
         super().__init__()