You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 22:49:15 UTC

[incubator-sdap-ingester] branch rabbitmq-fix updated (eda227f -> b2f3e38)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard eda227f  conflict resolve
 discard 6c3d46d  use asyncio in the collection ingester
 discard 60c383b  solr history bug fixes
 discard 006e80b  use pysolr
 discard 95f023e  error handling
 discard abbf063  exc handling
 discard 1ab7753  propagate child worker exceptions up to main process
 discard bddc7c5  the healthchecks now raise exceptions if they rail
 discard 8352331  error handling
 discard b89c315  better exception handling
 discard 016452d  better error handling
     add e309727  add a on.update kopf event for crd updates
     add c599caf  SDAP-271 Cassandra authentication support (#11)
     new a1f0fe2  better error handling
     new 5a4ef11  better exception handling
     new 6b2e6a0  error handling
     new 1be91cd  the healthchecks now raise exceptions if they rail
     new 11789f4  propagate child worker exceptions up to main process
     new 3afb12c  exc handling
     new 02df569  error handling
     new 4b81e13  use pysolr
     new 6770300  solr history bug fixes
     new eb95f5e  use asyncio in the collection ingester
     new b2f3e38  conflict resolve

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (eda227f)
            \
             N -- N -- N   refs/heads/rabbitmq-fix (b2f3e38)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 config_operator/README.md                          | 10 +++++-
 .../config_source/RemoteGitConfig.py               |  4 +--
 config_operator/config_operator/main.py            | 38 +++++++++++++++++-----
 .../containers/k8s/config-operator-crd.yml         |  2 +-
 .../containers/k8s/deployment-git-src.yml          | 23 -------------
 config_operator/containers/k8s/git-repo-test.yml   |  4 +--
 granule_ingester/docker/entrypoint.sh              |  2 +-
 .../granule_ingester/exceptions/Exceptions.py      |  8 +++++
 granule_ingester/granule_ingester/main.py          | 28 +++++++---------
 9 files changed, 64 insertions(+), 55 deletions(-)
 delete mode 100644 config_operator/containers/k8s/deployment-git-src.yml


[incubator-sdap-ingester] 11/11: conflict resolve

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit b2f3e384025ab2c721c7c92afda1fdfbd993cdbe
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 12:32:57 2020 -0700

    conflict resolve
---
 .../services/CollectionWatcher.py                  | 32 ----------------------
 1 file changed, 32 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 00afce4..8911806 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -44,13 +44,9 @@ class CollectionWatcher:
         :return: None
         """
 
-<<<<<<< HEAD
         await self._run_periodically(loop=loop,
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
-=======
-        await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
->>>>>>> use asyncio in the collection ingester
         self._observer.start()
 
     def collections(self) -> Set[Collection]:
@@ -136,34 +132,20 @@ class CollectionWatcher:
     async def _run_periodically(cls,
                                 loop: Optional[asyncio.AbstractEventLoop],
                                 wait_time: float,
-<<<<<<< HEAD
                                 func: Callable[[any], Awaitable],
                                 *args,
                                 **kwargs):
-=======
-                                coro,
-                                *args):
->>>>>>> use asyncio in the collection ingester
         """
         Call a function periodically. This uses asyncio, and is non-blocking.
         :param loop: An optional event loop to use. If None, the current running event loop will be used.
         :param wait_time: seconds to wait between iterations of func
-<<<<<<< HEAD
         :param func: the async function that will be awaited
-=======
-        :param coro: the coroutine that will be awaited
->>>>>>> use asyncio in the collection ingester
         :param args: any args that need to be provided to func
         """
         if loop is None:
             loop = asyncio.get_running_loop()
-<<<<<<< HEAD
         await func(*args, **kwargs)
         loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
-=======
-        await coro(*args)
-        loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
->>>>>>> use asyncio in the collection ingester
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
@@ -171,29 +153,19 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-<<<<<<< HEAD
     def __init__(self,
                  loop: asyncio.AbstractEventLoop,
                  callback: Callable[[str, Collection], Awaitable],
                  collections_for_dir: Set[Collection]):
         self._loop = loop
         self._callback = callback
-=======
-    def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
-        self._loop = loop
-        self._callback_coro = callback_coro
->>>>>>> use asyncio in the collection ingester
         self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-<<<<<<< HEAD
                 self._loop.create_task(self._callback(event.src_path, collection))
-=======
-                self._loop.create_task(self._callback_coro(event.src_path, collection))
->>>>>>> use asyncio in the collection ingester
 
     def on_modified(self, event):
         super().on_modified(event)
@@ -202,8 +174,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-<<<<<<< HEAD
                 self._loop.create_task(self._callback(event.src_path, collection))
-=======
-                self._loop.create_task(self._callback_coro(event.src_path, collection))
->>>>>>> use asyncio in the collection ingester


[incubator-sdap-ingester] 09/11: solr history bug fixes

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 67703003e505095edc6eab8843e899d20df62347
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700

    solr history bug fixes
---
 granule_ingester/conda-requirements.txt                | 2 +-
 granule_ingester/granule_ingester/consumer/Consumer.py | 1 +
 granule_ingester/granule_ingester/main.py              | 2 ++
 granule_ingester/granule_ingester/pipeline/Pipeline.py | 9 ++++++++-
 granule_ingester/granule_ingester/writers/SolrStore.py | 2 +-
 5 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
                 raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
                 await queue_iter.close()
+                await channel.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 87a6d5a..e50a395 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -109,6 +109,8 @@ async def main(loop):
     cassandra_password = args.cassandra_password
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import os
 import pickle
 import time
 from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
         # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
         self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
         return processor_module
 
     async def run(self):
+
+        logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
 
@@ -170,7 +175,9 @@ class Pipeline:
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
-                                      shared_memory)) as pool:
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
 
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
-            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
             collections = {}
             for c in zk.zk.get_children("collections"):
                 collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))


[incubator-sdap-ingester] 07/11: error handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 02df569ded4172c489a1b98498d1b821289999e2
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 17:36:08 2020 -0700

    error handling
---
 granule_ingester/granule_ingester/main.py          |  5 ++--
 .../granule_ingester/pipeline/Pipeline.py          | 35 +++++++++++++---------
 2 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index bb9ad40..751da19 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -134,4 +134,5 @@ async def main():
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 14dc032..f1aa021 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -19,15 +19,15 @@ import time
 from multiprocessing import Manager
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
+from aiomultiprocess import Pool
 from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
 from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError
+from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
@@ -97,9 +99,12 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
-        # Create a SyncManager Namespace so that we can to communicate exceptions from the
+        # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
-        self._shared_memory = Manager().Namespace()
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
@@ -158,26 +163,28 @@ class Pipeline:
     async def run(self):
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      self._metadata_store_factory,
-                                                      self._shared_memory)) as pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory)) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+                for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
                     try:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
-                        raise pickle.loads(self._shared_memory.error)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]


[incubator-sdap-ingester] 06/11: exc handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 3afb12c159ef309decf21669597b03969270db74
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 14:08:05 2020 -0700

    exc handling
---
 granule_ingester/granule_ingester/consumer/Consumer.py     | 11 +++++++----
 granule_ingester/granule_ingester/exceptions/Exceptions.py |  8 +++++---
 granule_ingester/granule_ingester/exceptions/__init__.py   |  1 -
 granule_ingester/granule_ingester/main.py                  |  2 +-
 4 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 59db4e8..d40b54c 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,8 +17,8 @@ import logging
 
 import aio_pika
 
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
-    RabbitMQFailedHealthCheckError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -83,6 +83,9 @@ class Consumer(HealthCheck):
         except PipelineRunningError as e:
             await message.reject()
             logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
             await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
@@ -98,7 +101,7 @@ class Consumer(HealthCheck):
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
                 # connection has died, and attempting to close the queue will only raise another exception.
-                raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
-                queue_iter.close()
+                await queue_iter.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 7741ca6..0304b9b 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -25,7 +25,11 @@ class SolrLostConnectionError(LostConnectionError):
     pass
 
 
-class CassandraConnectionError(Exception):
+class RabbitMQLostConnectionError(LostConnectionError):
+    pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
     pass
 
 
@@ -43,5 +47,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
 
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
-
-
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 838ccff..ea0969f 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,4 +1,3 @@
-from .Exceptions import CassandraConnectionError
 from .Exceptions import CassandraFailedHealthCheckError
 from .Exceptions import CassandraLostConnectionError
 from .Exceptions import FailedHealthCheckError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 86bc569..bb9ad40 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,7 +15,7 @@
 
 import argparse
 import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 import logging
 from functools import partial
 from typing import List


[incubator-sdap-ingester] 10/11: use asyncio in the collection ingester

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit eb95f5e6a72f038d53560d24e30e2bd914b70b39
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 28 15:09:03 2020 -0700

    use asyncio in the collection ingester
---
 .../services/CollectionWatcher.py                  | 32 ++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..00afce4 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -44,9 +44,13 @@ class CollectionWatcher:
         :return: None
         """
 
+<<<<<<< HEAD
         await self._run_periodically(loop=loop,
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
+=======
+        await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+>>>>>>> use asyncio in the collection ingester
         self._observer.start()
 
     def collections(self) -> Set[Collection]:
@@ -132,20 +136,34 @@ class CollectionWatcher:
     async def _run_periodically(cls,
                                 loop: Optional[asyncio.AbstractEventLoop],
                                 wait_time: float,
+<<<<<<< HEAD
                                 func: Callable[[any], Awaitable],
                                 *args,
                                 **kwargs):
+=======
+                                coro,
+                                *args):
+>>>>>>> use asyncio in the collection ingester
         """
         Call a function periodically. This uses asyncio, and is non-blocking.
         :param loop: An optional event loop to use. If None, the current running event loop will be used.
         :param wait_time: seconds to wait between iterations of func
+<<<<<<< HEAD
         :param func: the async function that will be awaited
+=======
+        :param coro: the coroutine that will be awaited
+>>>>>>> use asyncio in the collection ingester
         :param args: any args that need to be provided to func
         """
         if loop is None:
             loop = asyncio.get_running_loop()
+<<<<<<< HEAD
         await func(*args, **kwargs)
         loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
+=======
+        await coro(*args)
+        loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
+>>>>>>> use asyncio in the collection ingester
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
@@ -153,19 +171,29 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
+<<<<<<< HEAD
     def __init__(self,
                  loop: asyncio.AbstractEventLoop,
                  callback: Callable[[str, Collection], Awaitable],
                  collections_for_dir: Set[Collection]):
         self._loop = loop
         self._callback = callback
+=======
+    def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
+        self._loop = loop
+        self._callback_coro = callback_coro
+>>>>>>> use asyncio in the collection ingester
         self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
+<<<<<<< HEAD
                 self._loop.create_task(self._callback(event.src_path, collection))
+=======
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
+>>>>>>> use asyncio in the collection ingester
 
     def on_modified(self, event):
         super().on_modified(event)
@@ -174,4 +202,8 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
+<<<<<<< HEAD
                 self._loop.create_task(self._callback(event.src_path, collection))
+=======
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
+>>>>>>> use asyncio in the collection ingester


[incubator-sdap-ingester] 08/11: use pysolr

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 4b81e13601a94ff9a943bbf4c9bd5e644c651272
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 16 18:24:00 2020 -0700

    use pysolr
---
 granule_ingester/docker/entrypoint.sh              |  1 +
 granule_ingester/granule_ingester/main.py          | 14 ++--
 .../granule_ingester/pipeline/Pipeline.py          |  4 +-
 .../granule_ingester/writers/SolrStore.py          | 81 ++++++++++++++--------
 granule_ingester/requirements.txt                  |  4 +-
 5 files changed, 67 insertions(+), 37 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index b703ee3..b8369c6 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -10,3 +10,4 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
   $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 751da19..87a6d5a 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,16 +15,15 @@
 
 import argparse
 import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 import logging
+import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
-import sys
+from granule_ingester.writers import CassandraStore, SolrStore
 
 
 def cassandra_factory(contact_points, port, username, password):
@@ -33,8 +32,8 @@ def cassandra_factory(contact_points, port, username, password):
     return store
 
 
-def solr_factory(solr_host_and_port):
-    store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
     store.connect()
     return store
 
@@ -86,6 +85,8 @@ async def main(loop):
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--zk_host_and_port',
+                        metavar="HOST:PORT")
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -109,6 +110,7 @@ async def main(loop):
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
     solr_host_and_port = args.solr_host_and_port
+    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f1aa021..a667d5e 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -27,7 +27,7 @@ from nexusproto import DataTile_pb2 as nexusproto
 from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -180,6 +180,8 @@ class Pipeline:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
+                        # Give the shared memory manager some time to write the exception
+                        # await asyncio.sleep(1)
                         raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 6baad28..926a75c 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
 import logging
+from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
 from nexusproto.DataTile_pb2 import *
-from tenacity import *
 
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
-from granule_ingester.exceptions import SolrFailedHealthCheckError
+
 logger = logging.getLogger(__name__)
 
 
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
+
+
 class SolrStore(MetadataStore):
-    def __init__(self, host_and_port='http://localhost:8983'):
+    def __init__(self, solr_url=None, zk_url=None):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._host_and_port = host_and_port
+        self._solr_url = solr_url
+        self._zk_url = zk_url
         self.geo_precision: int = 3
-        self.collection: str = "nexustiles"
+        self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._session = None
+        self._solr = None
+
+    def _get_connection(self) -> pysolr.Solr:
+        if self._zk_url:
+            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            collections = {}
+            for c in zk.zk.get_children("collections"):
+                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+            zk.collections = collections
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+        elif self._solr_url:
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+        else:
+            raise RuntimeError("You must provide either solr_host or zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._session = aiohttp.ClientSession(loop=loop)
+        self._solr = self._get_connection()
 
     async def health_check(self):
         try:
-            async with aiohttp.ClientSession() as session:
-                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
-                if response.status == 200:
-                    return True
-                else:
-                    logger.error("Solr health check returned status {}.".format(response.status))
-        except aiohttp.ClientConnectionError as e:
-            raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
-
-        return False
+            connection = self._get_connection()
+            connection.ping()
+        except pysolr.SolrError:
+            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+        except NoNodeError:
+            raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+        except KazooTimeoutError:
+            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
+        await self._save_document(solr_doc)
 
-        await self._save_document(self.collection, solr_doc)
-
-    @retry(stop=stop_after_attempt(5))
-    async def _save_document(self, collection: str, doc: dict):
-        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
-        response = await self._session.post(url, json=doc)
-        if response.status < 200 or response.status >= 400:
-            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+    @run_in_executor
+    def _save_document(self, doc: dict):
+        try:
+            self._solr.add([doc])
+        except pysolr.SolrError:
+            raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 0479f99..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,4 +1,6 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
 aioboto3
-tblib==1.6.0
\ No newline at end of file
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file


[incubator-sdap-ingester] 01/11: better error handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a1f0fe281d2c3d83bd77f7d04020aa3e0126a46b
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500

    better error handling
---
 .../granule_ingester/consumer/Consumer.py          |  7 +++-
 .../granule_ingester/pipeline/Pipeline.py          | 39 +++++++++++-----------
 granule_ingester/tests/pipeline/test_Pipeline.py   |  9 ++---
 3 files changed, 31 insertions(+), 24 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
 
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
 
 logger = logging.getLogger(__name__)
 
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
             message.ack()
+        except PipelineBuildingError as e:
+            message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
         except Exception as e:
             message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
 import logging
 import time
 from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
 import aiomultiprocess
 import xarray as xr
 import yaml
+from yaml.scanner import ScannerError
 from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings)
-
-    @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
                         module_mappings: dict):
-        granule_loader = GranuleLoader(**config['granule'])
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
 
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
 
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+            return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)


[incubator-sdap-ingester] 04/11: the healthchecks now raise exceptions if they rail

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 1be91cdca001fc15639bb175efdbdb1115e15dca
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Fri Jul 10 10:14:05 2020 -0500

    the healthchecks now raise exceptions if they rail
---
 granule_ingester/granule_ingester/consumer/Consumer.py | 12 ++++++------
 granule_ingester/granule_ingester/main.py              |  1 +
 granule_ingester/granule_ingester/writers/DataStore.py |  1 +
 granule_ingester/granule_ingester/writers/SolrStore.py |  4 ++--
 4 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 439415f..59db4e8 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,9 @@
 import logging
 
 import aio_pika
-import sys
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
+
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
+    RabbitMQFailedHealthCheckError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -48,8 +49,8 @@ class Consumer(HealthCheck):
             await connection.close()
             return True
         except:
-            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
-            return False
+            raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+                                                 f"Connection string was {self._connection_string}")
 
     async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
@@ -97,8 +98,7 @@ class Consumer(HealthCheck):
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
                 # connection has died, and attempting to close the queue will only raise another exception.
-                raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+                raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
                 queue_iter.close()
                 raise e
-
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index a7a66c6..86bc569 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,6 +15,7 @@
 
 import argparse
 import asyncio
+from granule_ingester.exceptions import FailedHealthCheckError
 import logging
 from functools import partial
 from typing import List
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
 
 class DataStore(HealthCheck, ABC):
 
+
     @abstractmethod
     def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..6baad28 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -26,7 +26,7 @@ from nexusproto.DataTile_pb2 import *
 from tenacity import *
 
 from granule_ingester.writers.MetadataStore import MetadataStore
-
+from granule_ingester.exceptions import SolrFailedHealthCheckError
 logger = logging.getLogger(__name__)
 
 
@@ -56,7 +56,7 @@ class SolrStore(MetadataStore):
                 else:
                     logger.error("Solr health check returned status {}.".format(response.status))
         except aiohttp.ClientConnectionError as e:
-            logger.error("Cannot connect to Solr!")
+            raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
 
         return False
 


[incubator-sdap-ingester] 05/11: propagate child worker exceptions up to main process

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 11789f4ed7587d481c93aaf8b90b5a6f54b0dfc0
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 13:52:50 2020 -0700

    propagate child worker exceptions up to main process
---
 .../granule_ingester/exceptions/Exceptions.py      |  6 +++
 .../granule_ingester/exceptions/__init__.py        |  1 +
 .../granule_ingester/pipeline/Pipeline.py          | 53 ++++++++++++++--------
 granule_ingester/requirements.txt                  |  1 +
 4 files changed, 41 insertions(+), 20 deletions(-)

diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index c648b99..7741ca6 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -25,6 +25,10 @@ class SolrLostConnectionError(LostConnectionError):
     pass
 
 
+class CassandraConnectionError(Exception):
+    pass
+
+
 class FailedHealthCheckError(Exception):
     pass
 
@@ -39,3 +43,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
 
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
+
+
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index ea0969f..838ccff 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
+from .Exceptions import CassandraConnectionError
 from .Exceptions import CassandraFailedHealthCheckError
 from .Exceptions import CassandraLostConnectionError
 from .Exceptions import FailedHealthCheckError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e52d99f..14dc032 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,20 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
+import pickle
 import time
+from multiprocessing import Manager
 from typing import List
 
 import aiomultiprocess
 import xarray as xr
 import yaml
 from aiomultiprocess.types import ProxyException
-from cassandra.cluster import NoHostAvailable
 from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -41,13 +42,15 @@ _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
 _worker_processor_list: List[TileProcessor] = None
 _worker_dataset = None
+_shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
     global _worker_data_store
     global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
+    global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
     # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
+    _shared_memory = shared_memory
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
-    global _worker_data_store
-    global _worker_metadata_store
-    global _worker_processor_list
-    global _worker_dataset
+    try:
+        input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+        processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
-    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-
-    if processed_tile:
-        # try:
-        await _worker_data_store.save_data(processed_tile)
-        await _worker_metadata_store.save_metadata(processed_tile)
-        # except NoHostAvailable as e:
-        #     logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
+        if processed_tile:
+            await _worker_data_store.save_data(processed_tile)
+            await _worker_metadata_store.save_metadata(processed_tile)
+    except Exception as e:
+        pickling_support.install(e)
+        _shared_memory.error = pickle.dumps(e)
+        raise
 
 
 def _recurse(processor_list: List[TileProcessor],
@@ -96,10 +97,15 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        # Create a SyncManager Namespace so that we can to communicate exceptions from the
+        # worker processes back to the main process.
+        self._shared_memory = Manager().Namespace()
+
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
         try:
             config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
@@ -108,6 +114,12 @@ class Pipeline:
         except yaml.scanner.ScannerError:
             raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
 
+    # TODO: this method should validate the config against an actual schema definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
@@ -150,17 +162,18 @@ class Pipeline:
                                             initargs=(self._tile_processors,
                                                       dataset,
                                                       self._data_store_factory,
-                                                      self._metadata_store_factory)) as pool:
+                                                      self._metadata_store_factory,
+                                                      self._shared_memory)) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+                for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
                     try:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
-                        raise PipelineRunningError("Running the pipeline failed and could not recover.")
+                        raise pickle.loads(self._shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index a6d64a2..0479f99 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,4 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
 aioboto3
+tblib==1.6.0
\ No newline at end of file


[incubator-sdap-ingester] 03/11: error handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 6b2e6a0b005e1295cbafa74d553dfdd09ca9550d
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 17:15:03 2020 -0500

    error handling
---
 .../granule_ingester/consumer/Consumer.py          | 30 ++++++++++++++--------
 granule_ingester/granule_ingester/main.py          |  1 +
 .../granule_ingester/pipeline/Pipeline.py          |  4 +++
 3 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index fadfe75..439415f 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,8 @@
 import logging
 
 import aio_pika
-
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+import sys
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -40,7 +40,7 @@ class Consumer(HealthCheck):
         self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -51,7 +51,7 @@ class Consumer(HealthCheck):
             logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
             return False
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -74,23 +74,31 @@ class Consumer(HealthCheck):
                                             data_store_factory=data_store_factory,
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
-            message.ack()
+            await message.ack()
         except PipelineBuildingError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
                              f"from RabbitMQ. The exception was:\n{e}")
         except PipelineRunningError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
         except Exception as e:
-            message.reject(requeue=True)
+            await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+                # connection has died, and attempting to close the queue will only raise another exception.
+                raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+            except Exception as e:
+                queue_iter.close()
+                raise e
+
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 9010e33..a7a66c6 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore
 from granule_ingester.writers import SolrStore
+import sys
 
 
 def cassandra_factory(contact_points, port, username, password):
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index c7b5d6a..e52d99f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -22,6 +22,7 @@ import aiomultiprocess
 import xarray as xr
 import yaml
 from aiomultiprocess.types import ProxyException
+from cassandra.cluster import NoHostAvailable
 from nexusproto import DataTile_pb2 as nexusproto
 from yaml.scanner import ScannerError
 
@@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
     processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
     if processed_tile:
+        # try:
         await _worker_data_store.save_data(processed_tile)
         await _worker_metadata_store.save_metadata(processed_tile)
+        # except NoHostAvailable as e:
+        #     logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
 
 
 def _recurse(processor_list: List[TileProcessor],


[incubator-sdap-ingester] 02/11: better exception handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 5a4ef11212facdcfdb30f7692953a034ed380a25
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 11:31:28 2020 -0500

    better exception handling
---
 .../granule_ingester/consumer/Consumer.py            |  5 ++++-
 .../granule_ingester/pipeline/Pipeline.py            | 13 ++++++++++---
 .../reading_processors/TileReadingProcessor.py       | 20 +++++++++-----------
 granule_ingester/requirements.txt                    |  2 +-
 4 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 31454c1..fadfe75 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,9 +17,9 @@ import logging
 
 import aio_pika
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
-from granule_ingester.exceptions import PipelineBuildingError
 
 logger = logging.getLogger(__name__)
 
@@ -79,6 +79,9 @@ class Consumer(HealthCheck):
             message.reject()
             logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
                              f"from RabbitMQ. The exception was:\n{e}")
+        except PipelineRunningError as e:
+            message.reject()
+            logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
         except Exception as e:
             message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f872e4d..c7b5d6a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,13 +17,15 @@
 import logging
 import time
 from typing import List
-from granule_ingester.exceptions import PipelineBuildingError
+
 import aiomultiprocess
 import xarray as xr
 import yaml
-from yaml.scanner import ScannerError
+from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
+from yaml.scanner import ScannerError
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -62,6 +64,7 @@ async def _process_tile_in_worker(serialized_input_tile: str):
 
     input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
     processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+
     if processed_tile:
         await _worker_data_store.save_data(processed_tile)
         await _worker_metadata_store.save_metadata(processed_tile)
@@ -149,7 +152,11 @@ class Pipeline:
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
                 for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
-                    await pool.map(_process_tile_in_worker, chunk)
+                    try:
+                        await pool.map(_process_tile_in_worker, chunk)
+                    except ProxyException:
+                        pool.terminate()
+                        raise PipelineRunningError("Running the pipeline failed and could not recover.")
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
 import xarray as xr
 from nexusproto import DataTile_pb2 as nexusproto
 
+from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
 
 
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
         self.latitude = latitude
         self.longitude = longitude
 
-        # Common optional properties
-        self.temp_dir = None
-        self.metadata = None
-        # self.temp_dir = self.environ['TEMP_DIR']
-        # self.metadata = self.environ['META']
-
     def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
-        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+        try:
+            dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
 
-        output_tile = nexusproto.NexusTile()
-        output_tile.CopyFrom(tile)
-        output_tile.summary.data_var_name = self.variable_to_read
+            output_tile = nexusproto.NexusTile()
+            output_tile.CopyFrom(tile)
+            output_tile.summary.data_var_name = self.variable_to_read
 
-        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+            return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+        except Exception:
+            raise TileProcessingError("Could not generate tiles from the granule.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..a6d64a2 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,3 @@
 cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
 aioboto3