You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 00:29:48 UTC
[incubator-sdap-ingester] branch async-history updated: wip
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/async-history by this push:
new ed86c3d wip
ed86c3d is described below
commit ed86c3d597d5dd5d788a4f16be5f1aa50d244a34
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Aug 10 17:29:32 2020 -0700
wip
---
.../services/CollectionWatcher.py | 2 --
.../history_manager/FileIngestionHistory.py | 6 +++---
.../history_manager/test_FileIngestionHistory.py | 24 ++++++++++++++--------
.../tests/services/test_CollectionProcessor.py | 2 +-
.../granule_ingester/writers/SolrStore.py | 18 +++++-----------
5 files changed, 24 insertions(+), 28 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1fd1678..49acceb 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -111,8 +111,6 @@ class CollectionWatcher:
self._unschedule_watches()
self._schedule_watches()
- else:
- logger.info("No updated collections, so no files to scan")
except CollectionConfigParsingError as e:
logger.error(e)
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 140ae87..ffa065f 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -66,7 +66,7 @@ class FileIngestionHistory(IngestionHistory):
except FileNotFoundError:
logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed")
- def _save_latest_timestamp(self):
+ async def _save_latest_timestamp(self):
if self._latest_ingested_file_update:
with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
@@ -90,10 +90,10 @@ class FileIngestionHistory(IngestionHistory):
except FileNotFoundError:
logger.info(f"no history file {self._history_file_path} to purge")
- def _push_record(self, file_name, signature):
+ async def _push_record(self, file_name, signature):
self._history_dict[file_name] = signature
self._history_file.write(f'{file_name},{signature}\n')
- def _get_signature(self, file_name):
+ async def _get_signature(self, file_name):
return self._history_dict.get(file_name, None)
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index d2ad45c..ff1fb3c 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -3,25 +3,30 @@ import pathlib
import tempfile
import unittest
-from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath
+from collection_manager.services.history_manager import (FileIngestionHistory,
+ md5sum_from_filepath)
+
+from common.async_test_utils.AsyncTestUtils import async_test
DATASET_ID = "zobi_la_mouche"
class TestFileIngestionHistory(unittest.TestCase):
- def test_get_md5sum(self):
+ @async_test
+ async def test_get_md5sum(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
- ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = ingestion_history._get_signature("blue")
+ await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = await ingestion_history._get_signature("blue")
self.assertEqual(result, "12weeukrhbwerqu7wier")
- def test_get_missing_md5sum(self):
+ @async_test
+ async def test_get_missing_md5sum(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
- ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = ingestion_history._get_signature("green")
+ await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = await ingestion_history._get_signature("green")
self.assertIsNone(result)
def test_already_ingested(self):
@@ -44,12 +49,13 @@ class TestFileIngestionHistory(unittest.TestCase):
del ingestion_history
- def test_already_ingested_is_false(self):
+ @async_test
+ async def test_already_ingested_is_false(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
# history_manager with this file
current_file_path = pathlib.Path(__file__)
- self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+ self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index aa143f3..fec4726 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -114,7 +114,7 @@ class TestCollectionProcessor(unittest.TestCase):
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
- mock_history_builder, mock_history):
+ mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
mock_history_builder.build.return_value = mock_history
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 65f8b09..67532b5 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,28 +21,20 @@ from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
from typing import Dict
-from common import AsyncUtils
import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
from kazoo.exceptions import NoNodeError
-from nexusproto.DataTile_pb2 import TileSummary, NexusTile
+from kazoo.handlers.threading import KazooTimeoutError
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.exceptions import (SolrFailedHealthCheckError,
+ SolrLostConnectionError)
from granule_ingester.writers.MetadataStore import MetadataStore
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
logger = logging.getLogger(__name__)
-def run_in_executor(f):
- @functools.wraps(f)
- def inner(*args, **kwargs):
- loop = asyncio.get_running_loop()
- return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
- return inner
-
-
class SolrStore(MetadataStore):
def __init__(self, solr_url=None, zk_url=None):
super().__init__()