You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 18:45:36 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new bc75c25  SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16)
bc75c25 is described below

commit bc75c25c9e52b39b9c4730272e88413743c9e9cb
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 11 11:45:26 2020 -0700

    SDAP-280: Collection Manager to talk to Ingestion History asynchronously (#16)
---
 collection_manager/README.md                       | 14 ++++----
 .../services/CollectionProcessor.py                |  8 ++---
 .../services/CollectionWatcher.py                  | 15 +++++----
 .../history_manager/FileIngestionHistory.py        |  9 ++---
 .../services/history_manager/IngestionHistory.py   | 26 +++++++--------
 .../history_manager/SolrIngestionHistory.py        | 11 +++----
 collection_manager/docker/Dockerfile               | 12 ++++---
 collection_manager/requirements.txt                |  2 +-
 .../history_manager/test_FileIngestionHistory.py   | 38 +++++++++++++---------
 .../tests/services/test_CollectionProcessor.py     | 16 +++++----
 .../tests/services/test_CollectionWatcher.py       |  1 -
 common/common/async_utils/AsyncUtils.py            | 11 +++++++
 common/common/async_utils/__init__.py              |  1 +
 granule_ingester/README.md                         | 21 ++++++------
 granule_ingester/docker/Dockerfile                 | 14 ++++----
 .../granule_ingester/writers/SolrStore.py          | 17 +++-------
 16 files changed, 118 insertions(+), 98 deletions(-)

diff --git a/collection_manager/README.md b/collection_manager/README.md
index 771f355..84df468 100644
--- a/collection_manager/README.md
+++ b/collection_manager/README.md
@@ -12,15 +12,15 @@ Manager service will publish a message to RabbitMQ to be picked up by the Granul
 Python 3.7
 
 ## Building the service
-From `incubator-sdap-ingester/collection_manager`, run:
-
-    $ python setup.py install
+From `incubator-sdap-ingester`, run:
+    $ cd common && python setup.py install
+    $ cd ../collection_manager python setup.py install
     
 
 ## Running the service
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester`, run:
 
-    $ python collection_manager/main.py -h
+    $ python collection_manager/collection_manager/main.py -h
     
 ### The Collections Configuration File
 
@@ -71,6 +71,6 @@ From `incubator-sdap-ingester/`, run:
     $ pip install pytest && pytest
     
 ## Building the Docker image
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester`, run:
 
-    $ docker build . -f docker/Dockerfile -t nexusjpl/collection-manager
+    $ docker build . -f collection_manager/docker/Dockerfile -t nexusjpl/collection-manager
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..ac61586 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
                         f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
-                        f"time range for collection '{collection.dataset_id}'. Skipping.")
+            logger.debug(f"Granule '{granule}' detected but has already been ingested in "
+                         f"collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        history_manager.push(granule)
+        await history_manager.push(granule)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8f67e16..54c8877 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,18 +1,19 @@
 import asyncio
-import time
 import logging
 import os
+import time
 from collections import defaultdict
-from typing import Dict, Callable, Set, Optional, Awaitable
+from typing import Awaitable, Callable, Dict, Optional, Set
+
 import yaml
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import (
+    CollectionConfigFileNotFoundError, CollectionConfigParsingError,
+    ConflictingPathCollectionError, MissingValueCollectionError,
+    RelativePathCollectionError, RelativePathError)
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
-from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
-    CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \
-    RelativePathCollectionError
-
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.DEBUG)
 
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..ffa065f 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
         Constructor
         :param history_path:
         :param dataset_id:
-        :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+        :param signature_fun: function which creates the signature of the cache,
+                              a file path string as argument and returns a string (md5sum, time stamp)
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
@@ -65,7 +66,7 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed")
 
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         if self._latest_ingested_file_update:
             with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
                 f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
@@ -89,10 +90,10 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"no history file {self._history_file_path} to purge")
 
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         self._history_dict[file_name] = signature
 
         self._history_file.write(f'{file_name},{signature}\n')
 
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         return self._history_dict.get(file_name, None)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    def push(self, file_path: str):
+    async def push(self, file_path: str):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        self._push_record(file_name, signature)
+        await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
             self._latest_ingested_file_update = os.path.getmtime(file_path)
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
-        self._save_latest_timestamp()
+        await self._save_latest_timestamp()
 
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    def already_ingested(self, file_path: str) -> bool:
+    async def already_ingested(self, file_path: str) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        return signature == self._get_signature(file_name)
+        return signature == await self._get_signature(file_name)
 
-    def get_granule_status(self,
-                           file_path: str,
-                           date_from: datetime = None,
-                           date_to: datetime = None) -> GranuleStatus:
+    async def get_granule_status(self,
+                                 file_path: str,
+                                 date_from: datetime = None,
+                                 date_to: datetime = None) -> GranuleStatus:
         """
         Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
         and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
         """
         if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         pass
 
     @abstractmethod
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         pass
 
     @abstractmethod
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         pass
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
 import pysolr
 import requests
 
+from common.async_utils.AsyncUtils import run_in_executor
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
 from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
+logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
     def __del__(self):
         self._req_session.close()
 
+    @run_in_executor
     def _push_record(self, file_name, signature):
         hash_id = doc_key(self._dataset_id, file_name)
         self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
+    @run_in_executor
     def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
         else:
             return None
 
+    @run_in_executor
     def _get_signature(self, file_name):
         hash_id = doc_key(self._dataset_id, file_name)
         results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "granule_s", "string")
                 self._add_field(schema_endpoint, "granule_signature_s", "string")
 
-            else:
-                logger.info(f"collection {self._granule_collection_name} already exists")
-
             if self._dataset_collection_name not in existing_collections:
                 # Create collection
                 payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
 
-            else:
-                logger.info(f"collection {self._dataset_collection_name} already exists")
-
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
 RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
 RUN apt-get update && apt-get install -y kubectl
 
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
+RUN cd /common && python setup.py install
 RUN cd /collection_manager && python setup.py install
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 7e47c51..ee12c89 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -1,6 +1,6 @@
 PyYAML==5.3.1
 pystache==0.5.4
-pysolr==3.8.1
+pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index d2ad45c..07ab0e1 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -3,53 +3,61 @@ import pathlib
 import tempfile
 import unittest
 
-from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath
+from collection_manager.services.history_manager import (FileIngestionHistory,
+                                                         md5sum_from_filepath)
+
+from common.async_test_utils.AsyncTestUtils import async_test
 
 DATASET_ID = "zobi_la_mouche"
 
 
 class TestFileIngestionHistory(unittest.TestCase):
 
-    def test_get_md5sum(self):
+    @async_test
+    async def test_get_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("blue")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("blue")
             self.assertEqual(result, "12weeukrhbwerqu7wier")
 
-    def test_get_missing_md5sum(self):
+    @async_test
+    async def test_get_missing_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("green")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("green")
             self.assertIsNone(result)
 
-    def test_already_ingested(self):
+    @async_test
+    async def test_already_ingested(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            ingestion_history.push(str(current_file_path))
-            self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+            await ingestion_history.push(str(current_file_path))
+            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
 
             del ingestion_history
 
-    def test_already_ingested_with_latest_modifcation_signature(self):
+    @async_test
+    async def test_already_ingested_with_latest_modifcation_signature(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, os.path.getmtime)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            ingestion_history.push(str(current_file_path))
-            self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+            await ingestion_history.push(str(current_file_path))
+            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
 
             del ingestion_history
 
-    def test_already_ingested_is_false(self):
+    @async_test
+    async def test_already_ingested_is_false(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index aa143f3..a7059d6 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -65,7 +65,7 @@ class TestCollectionProcessor(unittest.TestCase):
         self.assertEqual(filled, expected)
 
     @async_test
-    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
     async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
@@ -87,10 +87,12 @@ class TestCollectionProcessor(unittest.TestCase):
         mock_history.push.assert_called()
 
     @async_test
-    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
-    @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
+    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
+    @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',  autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
-    async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder,
+    async def test_process_granule_with_forward_processing_granule(self,
+                                                                   mock_publisher,
+                                                                   mock_history_builder,
                                                                    mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
@@ -110,11 +112,11 @@ class TestCollectionProcessor(unittest.TestCase):
         mock_history.push.assert_called()
 
     @async_test
-    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
     async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
-                                                                             mock_history_builder, mock_history):
+                                                                                   mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
 
@@ -132,7 +134,7 @@ class TestCollectionProcessor(unittest.TestCase):
         mock_history.push.assert_called()
 
     @async_test
-    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
+    @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
     async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 14e7c3c..c9a75c0 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -210,4 +210,3 @@ collections:
         callback = AsyncMock()
         await CollectionWatcher._run_periodically(None, 0.1, callback)
         await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)
-
diff --git a/common/common/async_utils/AsyncUtils.py b/common/common/async_utils/AsyncUtils.py
new file mode 100644
index 0000000..5fefd45
--- /dev/null
+++ b/common/common/async_utils/AsyncUtils.py
@@ -0,0 +1,11 @@
+import asyncio
+import functools
+
+
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
diff --git a/common/common/async_utils/__init__.py b/common/common/async_utils/__init__.py
new file mode 100644
index 0000000..9a468e0
--- /dev/null
+++ b/common/common/async_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncUtils import run_in_executor
diff --git a/granule_ingester/README.md b/granule_ingester/README.md
index 112f52d..1339835 100644
--- a/granule_ingester/README.md
+++ b/granule_ingester/README.md
@@ -12,23 +12,24 @@ data to Cassandra and Solr.
 Python 3.7
 
 ## Building the service
-From `incubator-sdap-ingester/granule_ingester`, run:
-
-    $ python setup.py install
+From `incubator-sdap-ingester`, run:
+    $ cd common && python setup.py install
+    $ cd ../granule_ingester && python setup.py install
     
 
 ## Launching the service
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
 
-    $ python granule_ingester/main.py -h
+    $ python granule_ingester/granule_ingester/main.py -h
     
 ## Running the tests
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
 
-    $ pip install pytest
-    $ pytest
+    $ cd common && python setup.py install
+    $ cd ../granule_ingester && python setup.py install
+    $ pip install pytest && pytest
     
 ## Building the Docker image
-From `incubator-sdap-ingester/granule_ingester`, run:
+From `incubator-sdap-ingester`, run:
 
-    $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester
+    $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
index 4b25318..57bacff 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -6,14 +6,16 @@ ENV PATH="/opt/conda/bin:$PATH"
 
 RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8
 
-COPY /granule_ingester /sdap/granule_ingester
-COPY /setup.py /sdap/setup.py
-COPY /requirements.txt /sdap/requirements.txt
-COPY /conda-requirements.txt /sdap/conda-requirements.txt
-COPY /docker/install_nexusproto.sh /install_nexusproto.sh
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY granule_ingester/granule_ingester /sdap/granule_ingester
+COPY granule_ingester/setup.py /sdap/setup.py
+COPY granule_ingester/requirements.txt /sdap/requirements.txt
+COPY granule_ingester/conda-requirements.txt /sdap/conda-requirements.txt
+COPY granule_ingester/docker/install_nexusproto.sh /install_nexusproto.sh
+COPY granule_ingester/docker/entrypoint.sh /entrypoint.sh
 
 RUN ./install_nexusproto.sh
+RUN cd /common && python setup.py install
 RUN cd /sdap && python setup.py install
 
 RUN apk del .build-deps
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..67532b5 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -23,25 +23,18 @@ from pathlib import Path
 from typing import Dict
 
 import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
 from kazoo.exceptions import NoNodeError
-from nexusproto.DataTile_pb2 import TileSummary, NexusTile
+from kazoo.handlers.threading import KazooTimeoutError
 
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.exceptions import (SolrFailedHealthCheckError,
+                                         SolrLostConnectionError)
 from granule_ingester.writers.MetadataStore import MetadataStore
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
 
 logger = logging.getLogger(__name__)
 
 
-def run_in_executor(f):
-    @functools.wraps(f)
-    def inner(*args, **kwargs):
-        loop = asyncio.get_running_loop()
-        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
-    return inner
-
-
 class SolrStore(MetadataStore):
     def __init__(self, solr_url=None, zk_url=None):
         super().__init__()