You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 18:45:36 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-280: Collection
Manager to talk to Ingestion History asynchronously (#16)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new bc75c25 SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16)
bc75c25 is described below
commit bc75c25c9e52b39b9c4730272e88413743c9e9cb
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 11 11:45:26 2020 -0700
SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16)
---
collection_manager/README.md | 14 ++++----
.../services/CollectionProcessor.py | 8 ++---
.../services/CollectionWatcher.py | 15 +++++----
.../history_manager/FileIngestionHistory.py | 9 ++---
.../services/history_manager/IngestionHistory.py | 26 +++++++--------
.../history_manager/SolrIngestionHistory.py | 11 +++----
collection_manager/docker/Dockerfile | 12 ++++---
collection_manager/requirements.txt | 2 +-
.../history_manager/test_FileIngestionHistory.py | 38 +++++++++++++---------
.../tests/services/test_CollectionProcessor.py | 16 +++++----
.../tests/services/test_CollectionWatcher.py | 1 -
common/common/async_utils/AsyncUtils.py | 11 +++++++
common/common/async_utils/__init__.py | 1 +
granule_ingester/README.md | 21 ++++++------
granule_ingester/docker/Dockerfile | 14 ++++----
.../granule_ingester/writers/SolrStore.py | 17 +++-------
16 files changed, 118 insertions(+), 98 deletions(-)
diff --git a/collection_manager/README.md b/collection_manager/README.md
index 771f355..84df468 100644
--- a/collection_manager/README.md
+++ b/collection_manager/README.md
@@ -12,15 +12,15 @@ Manager service will publish a message to RabbitMQ to be picked up by the Granul
Python 3.7
## Building the service
-From `incubator-sdap-ingester/collection_manager`, run:
-
- $ python setup.py install
+From `incubator-sdap-ingester`, run:
+ $ cd common && python setup.py install
+ $ cd ../collection_manager python setup.py install
## Running the service
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester`, run:
- $ python collection_manager/main.py -h
+ $ python collection_manager/collection_manager/main.py -h
### The Collections Configuration File
@@ -71,6 +71,6 @@ From `incubator-sdap-ingester/`, run:
$ pip install pytest && pytest
## Building the Docker image
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester`, run:
- $ docker build . -f docker/Dockerfile -t nexusjpl/collection-manager
+ $ docker build . -f collection_manager/docker/Dockerfile -t nexusjpl/collection-manager
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..ac61586 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
return
history_manager = self._get_history_manager(collection.dataset_id)
- granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+ granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
f"'{collection.dataset_id}'.")
use_priority = collection.historical_priority
else:
- logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
- f"time range for collection '{collection.dataset_id}'. Skipping.")
+ logger.debug(f"Granule '{granule}' detected but has already been ingested in "
+ f"collection '{collection.dataset_id}'. Skipping.")
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
await self._publisher.publish_message(body=dataset_config, priority=use_priority)
- history_manager.push(granule)
+ await history_manager.push(granule)
@staticmethod
def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8f67e16..54c8877 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,18 +1,19 @@
import asyncio
-import time
import logging
import os
+import time
from collections import defaultdict
-from typing import Dict, Callable, Set, Optional, Awaitable
+from typing import Awaitable, Callable, Dict, Optional, Set
+
import yaml
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import (
+ CollectionConfigFileNotFoundError, CollectionConfigParsingError,
+ ConflictingPathCollectionError, MissingValueCollectionError,
+ RelativePathCollectionError, RelativePathError)
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver as Observer
-from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
- CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \
- RelativePathCollectionError
-
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..ffa065f 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
Constructor
:param history_path:
:param dataset_id:
- :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+ :param signature_fun: function which creates the signature of the cache,
+ a file path string as argument and returns a string (md5sum, time stamp)
"""
self._dataset_id = dataset_id
self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
@@ -65,7 +66,7 @@ class FileIngestionHistory(IngestionHistory):
except FileNotFoundError:
logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed")
- def _save_latest_timestamp(self):
+ async def _save_latest_timestamp(self):
if self._latest_ingested_file_update:
with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
@@ -89,10 +90,10 @@ class FileIngestionHistory(IngestionHistory):
except FileNotFoundError:
logger.info(f"no history file {self._history_file_path} to purge")
- def _push_record(self, file_name, signature):
+ async def _push_record(self, file_name, signature):
self._history_dict[file_name] = signature
self._history_file.write(f'{file_name},{signature}\n')
- def _get_signature(self, file_name):
+ async def _get_signature(self, file_name):
return self._history_dict.get(file_name, None)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
_signature_fun = None
_latest_ingested_file_update = None
- def push(self, file_path: str):
+ async def push(self, file_path: str):
"""
Record a file as having been ingested.
:param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- self._push_record(file_name, signature)
+ await self._push_record(file_name, signature)
if not self._latest_ingested_file_update:
self._latest_ingested_file_update = os.path.getmtime(file_path)
else:
self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
- self._save_latest_timestamp()
+ await self._save_latest_timestamp()
def latest_ingested_mtime(self) -> Optional[datetime]:
"""
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
else:
return None
- def already_ingested(self, file_path: str) -> bool:
+ async def already_ingested(self, file_path: str) -> bool:
"""
Return a boolean indicating whether the specified file has already been ingested, based on its signature.
:param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
file_path = file_path.strip()
file_name = os.path.basename(file_path)
signature = self._signature_fun(file_path)
- return signature == self._get_signature(file_name)
+ return signature == await self._get_signature(file_name)
- def get_granule_status(self,
- file_path: str,
- date_from: datetime = None,
- date_to: datetime = None) -> GranuleStatus:
+ async def get_granule_status(self,
+ file_path: str,
+ date_from: datetime = None,
+ date_to: datetime = None) -> GranuleStatus:
"""
Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
"""
if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+ elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
@abstractmethod
- def _save_latest_timestamp(self):
+ async def _save_latest_timestamp(self):
pass
@abstractmethod
- def _push_record(self, file_name, signature):
+ async def _push_record(self, file_name, signature):
pass
@abstractmethod
- def _get_signature(self, file_name):
+ async def _get_signature(self, file_name):
pass
@staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
import pysolr
import requests
+from common.async_utils.AsyncUtils import run_in_executor
from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
+logging.getLogger("pysolr").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
def __del__(self):
self._req_session.close()
+ @run_in_executor
def _push_record(self, file_name, signature):
hash_id = doc_key(self._dataset_id, file_name)
self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_granules.commit()
return None
+ @run_in_executor
def _save_latest_timestamp(self):
if self._solr_datasets:
self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
else:
return None
+ @run_in_executor
def _get_signature(self, file_name):
hash_id = doc_key(self._dataset_id, file_name)
results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "granule_s", "string")
self._add_field(schema_endpoint, "granule_signature_s", "string")
- else:
- logger.info(f"collection {self._granule_collection_name} already exists")
-
if self._dataset_collection_name not in existing_collections:
# Create collection
payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
- else:
- logger.info(f"collection {self._dataset_collection_name} already exists")
-
except requests.exceptions.RequestException as e:
logger.error(f"solr instance unreachable {self._solr_url}")
raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
RUN apt-get update && apt-get install -y kubectl
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
+RUN cd /common && python setup.py install
RUN cd /collection_manager && python setup.py install
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 7e47c51..ee12c89 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -1,6 +1,6 @@
PyYAML==5.3.1
pystache==0.5.4
-pysolr==3.8.1
+pysolr==3.9.0
watchdog==0.10.2
requests==2.23.0
aio-pika==6.6.1
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index d2ad45c..07ab0e1 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -3,53 +3,61 @@ import pathlib
import tempfile
import unittest
-from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath
+from collection_manager.services.history_manager import (FileIngestionHistory,
+ md5sum_from_filepath)
+
+from common.async_test_utils.AsyncTestUtils import async_test
DATASET_ID = "zobi_la_mouche"
class TestFileIngestionHistory(unittest.TestCase):
- def test_get_md5sum(self):
+ @async_test
+ async def test_get_md5sum(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
- ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = ingestion_history._get_signature("blue")
+ await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = await ingestion_history._get_signature("blue")
self.assertEqual(result, "12weeukrhbwerqu7wier")
- def test_get_missing_md5sum(self):
+ @async_test
+ async def test_get_missing_md5sum(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
- ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = ingestion_history._get_signature("green")
+ await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = await ingestion_history._get_signature("green")
self.assertIsNone(result)
- def test_already_ingested(self):
+ @async_test
+ async def test_already_ingested(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
# history_manager with this file
current_file_path = pathlib.Path(__file__)
- ingestion_history.push(str(current_file_path))
- self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+ await ingestion_history.push(str(current_file_path))
+ self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
del ingestion_history
- def test_already_ingested_with_latest_modifcation_signature(self):
+ @async_test
+ async def test_already_ingested_with_latest_modifcation_signature(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, os.path.getmtime)
# history_manager with this file
current_file_path = pathlib.Path(__file__)
- ingestion_history.push(str(current_file_path))
- self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+ await ingestion_history.push(str(current_file_path))
+ self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
del ingestion_history
- def test_already_ingested_is_false(self):
+ @async_test
+ async def test_already_ingested_is_false(self):
with tempfile.TemporaryDirectory() as history_dir:
ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
# history_manager with this file
current_file_path = pathlib.Path(__file__)
- self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+ self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index aa143f3..a7059d6 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -65,7 +65,7 @@ class TestCollectionProcessor(unittest.TestCase):
self.assertEqual(filled, expected)
@async_test
- @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+ @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
@@ -87,10 +87,12 @@ class TestCollectionProcessor(unittest.TestCase):
mock_history.push.assert_called()
@async_test
- @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
- @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
+ @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
+ @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
- async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder,
+ async def test_process_granule_with_forward_processing_granule(self,
+ mock_publisher,
+ mock_history_builder,
mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
mock_history_builder.build.return_value = mock_history
@@ -110,11 +112,11 @@ class TestCollectionProcessor(unittest.TestCase):
mock_history.push.assert_called()
@async_test
- @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+ @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
- mock_history_builder, mock_history):
+ mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
mock_history_builder.build.return_value = mock_history
@@ -132,7 +134,7 @@ class TestCollectionProcessor(unittest.TestCase):
mock_history.push.assert_called()
@async_test
- @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+ @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 14e7c3c..c9a75c0 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -210,4 +210,3 @@ collections:
callback = AsyncMock()
await CollectionWatcher._run_periodically(None, 0.1, callback)
await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)
-
diff --git a/common/common/async_utils/AsyncUtils.py b/common/common/async_utils/AsyncUtils.py
new file mode 100644
index 0000000..5fefd45
--- /dev/null
+++ b/common/common/async_utils/AsyncUtils.py
@@ -0,0 +1,11 @@
+import asyncio
+import functools
+
+
+def run_in_executor(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ loop = asyncio.get_running_loop()
+ return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+ return inner
diff --git a/common/common/async_utils/__init__.py b/common/common/async_utils/__init__.py
new file mode 100644
index 0000000..9a468e0
--- /dev/null
+++ b/common/common/async_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncUtils import run_in_executor
diff --git a/granule_ingester/README.md b/granule_ingester/README.md
index 112f52d..1339835 100644
--- a/granule_ingester/README.md
+++ b/granule_ingester/README.md
@@ -12,23 +12,24 @@ data to Cassandra and Solr.
Python 3.7
## Building the service
-From `incubator-sdap-ingester/granule_ingester`, run:
-
- $ python setup.py install
+From `incubator-sdap-ingester`, run:
+ $ cd common && python setup.py install
+ $ cd ../granule_ingester && python setup.py install
## Launching the service
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
- $ python granule_ingester/main.py -h
+ $ python granule_ingester/granule_ingester/main.py -h
## Running the tests
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
- $ pip install pytest
- $ pytest
+ $ cd common && python setup.py install
+ $ cd ../granule_ingester && python setup.py install
+ $ pip install pytest && pytest
## Building the Docker image
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
- $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester
+ $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
index 4b25318..57bacff 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -6,14 +6,16 @@ ENV PATH="/opt/conda/bin:$PATH"
RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8
-COPY /granule_ingester /sdap/granule_ingester
-COPY /setup.py /sdap/setup.py
-COPY /requirements.txt /sdap/requirements.txt
-COPY /conda-requirements.txt /sdap/conda-requirements.txt
-COPY /docker/install_nexusproto.sh /install_nexusproto.sh
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY granule_ingester/granule_ingester /sdap/granule_ingester
+COPY granule_ingester/setup.py /sdap/setup.py
+COPY granule_ingester/requirements.txt /sdap/requirements.txt
+COPY granule_ingester/conda-requirements.txt /sdap/conda-requirements.txt
+COPY granule_ingester/docker/install_nexusproto.sh /install_nexusproto.sh
+COPY granule_ingester/docker/entrypoint.sh /entrypoint.sh
RUN ./install_nexusproto.sh
+RUN cd /common && python setup.py install
RUN cd /sdap && python setup.py install
RUN apk del .build-deps
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..67532b5 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -23,25 +23,18 @@ from pathlib import Path
from typing import Dict
import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
from kazoo.exceptions import NoNodeError
-from nexusproto.DataTile_pb2 import TileSummary, NexusTile
+from kazoo.handlers.threading import KazooTimeoutError
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.exceptions import (SolrFailedHealthCheckError,
+ SolrLostConnectionError)
from granule_ingester.writers.MetadataStore import MetadataStore
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
logger = logging.getLogger(__name__)
-def run_in_executor(f):
- @functools.wraps(f)
- def inner(*args, **kwargs):
- loop = asyncio.get_running_loop()
- return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
- return inner
-
-
class SolrStore(MetadataStore):
def __init__(self, solr_url=None, zk_url=None):
super().__init__()