You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:47 UTC
[incubator-sdap-ingester] branch rabbitmq-fix updated (1a66bc1 ->
2525001)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.
discard 1a66bc1 use asyncio in the collection ingester
discard c2c4479 solr history bug fixes
discard 40a62b0 use pysolr
discard 27f3a35 error handling
discard c3f1c5a exc handling
discard ceea9a5 propagate child worker exceptions up to main process
discard c6fdd6f the healthchecks now raise exceptions if they rail
discard 8344d30 error handling
discard 98bc25d better exception handling
discard 8a12adf better error handling
add ecd5ecc SDAP-247: config-operator unit tests and support for git username/token (#6)
new 6efb623 better error handling
new 7feb58d better exception handling
new 0c97d99 error handling
new 562ef6f the healthchecks now raise exceptions if they rail
new de8c5a0 propagate child worker exceptions up to main process
new 41755a5 exc handling
new 385f5b8 error handling
new fd5cc22 use pysolr
new cdd30ae solr history bug fixes
new 2525001 use asyncio in the collection ingester
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (1a66bc1)
\
N -- N -- N refs/heads/rabbitmq-fix (2525001)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
config_operator/README.md | 6 +-
.../config_source/LocalDirConfig.py | 40 ++++++------
.../config_source/RemoteGitConfig.py | 51 +++++++++------
.../config_operator/k8s/K8sConfigMap.py | 26 +++++---
config_operator/config_operator/main.py | 12 +++-
config_operator/containers/k8s/git-repo-test.yml | 9 +++
config_operator/requirements.txt | 1 +
.../tests/config_source/test_LocalDirConfig.py | 72 ++++++++++++++++++++++
.../tests/config_source/test_RemoteGitConfig.py | 49 +++++++++++++++
config_operator/tests/k8s/test_K8sConfigMap.py | 53 +++++++++++++++-
.../resources/localDirBadTest/collections.yml | 2 +
.../tests/resources/localDirTest/.hidden_file.txt | 1 +
.../tests/resources/localDirTest/README.md | 1 +
.../tests/resources/localDirTest/collections.yml | 1 +
14 files changed, 269 insertions(+), 55 deletions(-)
create mode 100644 config_operator/containers/k8s/git-repo-test.yml
create mode 100644 config_operator/tests/config_source/test_LocalDirConfig.py
create mode 100644 config_operator/tests/config_source/test_RemoteGitConfig.py
create mode 100644 config_operator/tests/resources/localDirBadTest/collections.yml
create mode 100644 config_operator/tests/resources/localDirTest/.hidden_file.txt
create mode 100644 config_operator/tests/resources/localDirTest/README.md
create mode 100644 config_operator/tests/resources/localDirTest/collections.yml
[incubator-sdap-ingester] 04/10: the healthchecks now raise
exceptions if they rail
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 562ef6f3d221dc84b2529a2bdc3ba4a319859ff2
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Fri Jul 10 10:14:05 2020 -0500
the healthchecks now raise exceptions if they rail
---
.../granule_ingester/consumer/Consumer.py | 12 ++++++------
.../granule_ingester/exceptions/Exceptions.py | 18 +++++++++++++++++-
.../granule_ingester/exceptions/__init__.py | 6 +++++-
granule_ingester/granule_ingester/main.py | 20 ++++++++++----------
.../granule_ingester/writers/CassandraStore.py | 9 +++++----
.../granule_ingester/writers/DataStore.py | 1 +
.../granule_ingester/writers/SolrStore.py | 4 ++--
7 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 439415f..59db4e8 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,9 @@
import logging
import aio_pika
-import sys
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
+
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
+ RabbitMQFailedHealthCheckError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -48,8 +49,8 @@ class Consumer(HealthCheck):
await connection.close()
return True
except:
- logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
- return False
+ raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+ f"Connection string was {self._connection_string}")
async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
@@ -97,8 +98,7 @@ class Consumer(HealthCheck):
except aio_pika.exceptions.MessageProcessError:
# Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
# connection has died, and attempting to close the queue will only raise another exception.
- raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+ raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
queue_iter.close()
raise e
-
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 0dda59e..6e7d89a 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,5 +10,21 @@ class TileProcessingError(Exception):
pass
-class ConnectionErrorRabbitMQ(Exception):
+class RabbitMQConnectionError(Exception):
+ pass
+
+
+class FailedHealthCheckError(Exception):
+ pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index d0c2344..2ba1b4a 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,4 +1,8 @@
-from .Exceptions import ConnectionErrorRabbitMQ
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import FailedHealthCheckError
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQConnectionError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import SolrFailedHealthCheckError
from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 6e1a289..2754c7f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,6 +15,7 @@
import argparse
import asyncio
+from granule_ingester.exceptions import FailedHealthCheckError
import logging
from functools import partial
from typing import List
@@ -106,16 +107,15 @@ async def main():
data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
metadata_store_factory=partial(solr_factory, solr_host_and_port))
try:
- if await run_health_checks(
- [CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
- consumer]):
- async with consumer:
- logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming()
- else:
- logger.error("Quitting because not all dependencies passed the health checks.")
- sys.exit(1)
+ await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer])
+ async with consumer:
+ logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ except FailedHealthCheckError as e:
+ logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+ sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index ffac63d..530871d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -23,6 +23,7 @@ from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from nexusproto.DataTile_pb2 import NexusTile, TileData
+from granule_ingester.exceptions import CassandraFailedHealthCheckError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -47,9 +48,8 @@ class CassandraStore(DataStore):
session = self._get_session()
session.shutdown()
return True
- except:
- logger.error("Cannot connect to Cassandra!")
- return False
+ except Exception:
+ raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
cluster = Cluster(contact_points=self._contact_points, port=self._port)
@@ -69,7 +69,8 @@ class CassandraStore(DataStore):
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ await type(self)._execute_query_async(self._session, prepared_query,
+ [tile_id, bytearray(serialized_tile_data)])
except NoHostAvailable as e:
logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
class DataStore(HealthCheck, ABC):
+
@abstractmethod
def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..6baad28 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -26,7 +26,7 @@ from nexusproto.DataTile_pb2 import *
from tenacity import *
from granule_ingester.writers.MetadataStore import MetadataStore
-
+from granule_ingester.exceptions import SolrFailedHealthCheckError
logger = logging.getLogger(__name__)
@@ -56,7 +56,7 @@ class SolrStore(MetadataStore):
else:
logger.error("Solr health check returned status {}.".format(response.status))
except aiohttp.ClientConnectionError as e:
- logger.error("Cannot connect to Solr!")
+ raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
return False
[incubator-sdap-ingester] 09/10: solr history bug fixes
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit cdd30aef489f6e7d414b09d84ff970f120d2c1ea
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700
solr history bug fixes
---
collection_manager/collection_manager/main.py | 2 +-
.../history_manager/SolrIngestionHistory.py | 19 +++++++---------
granule_ingester/conda-requirements.txt | 2 +-
granule_ingester/docker/entrypoint.sh | 4 +++-
.../granule_ingester/consumer/Consumer.py | 1 +
granule_ingester/granule_ingester/main.py | 25 ++++++++++++++++++----
.../granule_ingester/pipeline/Pipeline.py | 9 +++++++-
.../granule_ingester/writers/CassandraStore.py | 14 ++++++++++--
.../granule_ingester/writers/SolrStore.py | 2 +-
9 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 7e72de5..43b687e 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -84,7 +84,7 @@ async def main():
return
except Exception as e:
- logger.error(e)
+ logger.exception(e)
return
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 1ae7156..4e6d3e5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -35,8 +35,8 @@ class SolrIngestionHistory(IngestionHistory):
try:
self._solr_url = solr_url
self._create_collection_if_needed()
- self._solr_granules = pysolr.Solr('/'.join([solr_url.strip('/'), self._granule_collection_name]))
- self._solr_datasets = pysolr.Solr('/'.join([solr_url.strip('/'), self._dataset_collection_name]))
+ self._solr_granules = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._granule_collection_name}")
+ self._solr_datasets = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._dataset_collection_name}")
self._dataset_id = dataset_id
self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
self._latest_ingested_file_update = self._get_latest_file_update()
@@ -63,7 +63,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_datasets.add([{
'id': self._dataset_id,
'dataset_s': self._dataset_id,
- 'latest_update_l': self._latest_ingested_file_update}])
+ 'latest_update_l': int(self._latest_ingested_file_update)}])
self._solr_datasets.commit()
def _get_latest_file_update(self):
@@ -87,8 +87,7 @@ class SolrIngestionHistory(IngestionHistory):
self._req_session = requests.session()
payload = {'action': 'CLUSTERSTATUS'}
- result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
node_number = len(response['cluster']['live_nodes'])
@@ -100,12 +99,11 @@ class SolrIngestionHistory(IngestionHistory):
'name': self._granule_collection_name,
'numShards': node_number
}
- result = self._req_session.get('/'.join([self._solr_url.strip("/"), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
- schema_url = '/'.join([self._solr_url.strip('/'), self._granule_collection_name, 'schema'])
+ schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
# granule_s # dataset_s so that all the granule of a dataset are less likely to be on the same shard
# self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
self._add_field(schema_url, "dataset_s", "StrField")
@@ -121,13 +119,12 @@ class SolrIngestionHistory(IngestionHistory):
'name': self._dataset_collection_name,
'numShards': node_number
}
- result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
# http://localhost:8983/solr/nexusdatasets/schema?_=1588555874864&wt=json
- schema_url = '/'.join([self._solr_url.strip('/'), self._dataset_collection_name, 'schema'])
+ schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
# self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
self._add_field(schema_url, "dataset_s", "StrField")
self._add_field(schema_url, "latest_update_l", "TrieLongField")
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
pyyaml==5.3.1
requests==2.23.0
aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
tenacity
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index 3a1cb9b..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
$([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
- $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+ $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
+ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \
$([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
await queue_iter.close()
+ await channel.close()
raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index f602da8..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore, SolrStore
-def cassandra_factory(contact_points, port):
- store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+ store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
store.connect()
return store
@@ -73,6 +73,14 @@ async def main(loop):
default=9042,
metavar="PORT",
help='Cassandra port. (Default: 9042)')
+ parser.add_argument('--cassandra_username',
+ metavar="USERNAME",
+ default=None,
+ help='Cassandra username. Optional.')
+ parser.add_argument('--cassandra_password',
+ metavar="PASSWORD",
+ default=None,
+ help='Cassandra password. Optional.')
parser.add_argument('--solr_host_and_port',
default='http://localhost:8983',
metavar='HOST:PORT',
@@ -99,6 +107,8 @@ async def main(loop):
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
+ cassandra_username = args.cassandra_username
+ cassandra_password = args.cassandra_password
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
@@ -106,11 +116,18 @@ async def main(loop):
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
- data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+ data_store_factory=partial(cassandra_factory,
+ cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
try:
solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
- await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+ await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
solr_store,
consumer])
async with consumer:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+import os
import pickle
import time
from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
return processor_module
async def run(self):
+
+ logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
@@ -170,7 +175,9 @@ class Pipeline:
dataset,
self._data_store_factory,
self._metadata_store_factory,
- shared_memory)) as pool:
+ shared_memory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6b2cf32..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,6 +18,7 @@ import asyncio
import logging
import uuid
+from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
@@ -39,8 +40,10 @@ class TileModel(Model):
class CassandraStore(DataStore):
- def __init__(self, contact_points=None, port=9042):
+ def __init__(self, contact_points=None, port=9042, username=None, password=None):
self._contact_points = contact_points
+ self._username = username
+ self._password = password
self._port = port
self._session = None
@@ -53,11 +56,18 @@ class CassandraStore(DataStore):
raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
+
+ if self._username and self._password:
+ auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+ else:
+ auth_provider = None
+
cluster = Cluster(contact_points=self._contact_points,
port=self._port,
# load_balancing_policy=
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
- default_retry_policy=RetryPolicy())
+ default_retry_policy=RetryPolicy(),
+ auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace('nexustiles')
return session
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
def _get_connection(self) -> pysolr.Solr:
if self._zk_url:
- zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+ zk = pysolr.ZooKeeper(f"{self._zk_url}")
collections = {}
for c in zk.zk.get_children("collections"):
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
[incubator-sdap-ingester] 03/10: error handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 0c97d99a80ae529a209f07e93bf9cba256bb2d93
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 17:15:03 2020 -0500
error handling
---
.../granule_ingester/consumer/Consumer.py | 30 ++++++++++++++--------
.../granule_ingester/exceptions/Exceptions.py | 4 +++
.../granule_ingester/exceptions/__init__.py | 3 ++-
granule_ingester/granule_ingester/main.py | 24 ++++++++++-------
.../granule_ingester/pipeline/Pipeline.py | 4 +++
.../granule_ingester/writers/CassandraStore.py | 13 ++++++----
6 files changed, 52 insertions(+), 26 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index fadfe75..439415f 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,8 @@
import logging
import aio_pika
-
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+import sys
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -40,7 +40,7 @@ class Consumer(HealthCheck):
self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
password=rabbitmq_password,
host=rabbitmq_host)
- self._connection = None
+ self._connection: aio_pika.Connection = None
async def health_check(self) -> bool:
try:
@@ -51,7 +51,7 @@ class Consumer(HealthCheck):
logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
return False
- async def _get_connection(self):
+ async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
async def __aenter__(self):
@@ -74,23 +74,31 @@ class Consumer(HealthCheck):
data_store_factory=data_store_factory,
metadata_store_factory=metadata_store_factory)
await pipeline.run()
- message.ack()
+ await message.ack()
except PipelineBuildingError as e:
- message.reject()
+ await message.reject()
logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
f"from RabbitMQ. The exception was:\n{e}")
except PipelineRunningError as e:
- message.reject()
+ await message.reject()
logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
except Exception as e:
- message.reject(requeue=True)
+ await message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self):
channel = await self._connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
+ queue_iter = queue.iterator()
+ async for message in queue_iter:
+ try:
await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+ except aio_pika.exceptions.MessageProcessError:
+ # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+ # connection has died, and attempting to close the queue will only raise another exception.
+ raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+ except Exception as e:
+ queue_iter.close()
+ raise e
+
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 8c25532..0dda59e 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -8,3 +8,7 @@ class PipelineRunningError(Exception):
class TileProcessingError(Exception):
pass
+
+
+class ConnectionErrorRabbitMQ(Exception):
+ pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 71607c2..d0c2344 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
-from .Exceptions import TileProcessingError
+from .Exceptions import ConnectionErrorRabbitMQ
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..6e1a289 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore
from granule_ingester.writers import SolrStore
+import sys
def cassandra_factory(contact_points, port):
@@ -104,15 +105,20 @@ async def main():
rabbitmq_queue=args.rabbitmq_queue,
data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
metadata_store_factory=partial(solr_factory, solr_host_and_port))
- if await run_health_checks(
- [CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
- consumer]):
- async with consumer:
- logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming()
- else:
- logger.error("Quitting because not all dependencies passed the health checks.")
+ try:
+ if await run_health_checks(
+ [CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer]):
+ async with consumer:
+ logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ else:
+ logger.error("Quitting because not all dependencies passed the health checks.")
+ sys.exit(1)
+ except Exception as e:
+ logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+ sys.exit(1)
if __name__ == '__main__':
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index c7b5d6a..e52d99f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -22,6 +22,7 @@ import aiomultiprocess
import xarray as xr
import yaml
from aiomultiprocess.types import ProxyException
+from cassandra.cluster import NoHostAvailable
from nexusproto import DataTile_pb2 as nexusproto
from yaml.scanner import ScannerError
@@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
if processed_tile:
+ # try:
await _worker_data_store.save_data(processed_tile)
await _worker_metadata_store.save_metadata(processed_tile)
+ # except NoHostAvailable as e:
+ # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
def _recurse(processor_list: List[TileProcessor],
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..ffac63d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,7 +18,7 @@ import asyncio
import logging
import uuid
-from cassandra.cluster import Cluster, Session
+from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from nexusproto.DataTile_pb2 import NexusTile, TileData
@@ -65,10 +65,13 @@ class CassandraStore(DataStore):
self._session.shutdown()
async def save_data(self, tile: NexusTile) -> None:
- tile_id = uuid.UUID(tile.summary.tile_id)
- serialized_tile_data = TileData.SerializeToString(tile.tile)
- prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ try:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+ await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ except NoHostAvailable as e:
+ logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
[incubator-sdap-ingester] 02/10: better exception handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 7feb58d8099d6441b0b220998c1fddf27ee04195
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 11:31:28 2020 -0500
better exception handling
---
.../granule_ingester/consumer/Consumer.py | 5 ++++-
.../granule_ingester/exceptions/Exceptions.py | 10 +++++++++-
.../granule_ingester/exceptions/__init__.py | 4 +++-
.../granule_ingester/pipeline/Pipeline.py | 13 ++++++++++---
.../reading_processors/TileReadingProcessor.py | 20 +++++++++-----------
granule_ingester/requirements.txt | 2 +-
6 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 31454c1..fadfe75 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,9 +17,9 @@ import logging
import aio_pika
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
-from granule_ingester.exceptions import PipelineBuildingError
logger = logging.getLogger(__name__)
@@ -79,6 +79,9 @@ class Consumer(HealthCheck):
message.reject()
logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
f"from RabbitMQ. The exception was:\n{e}")
+ except PipelineRunningError as e:
+ message.reject()
+ logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
except Exception as e:
message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 4c03e48..8c25532 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -1,2 +1,10 @@
class PipelineBuildingError(Exception):
- pass
\ No newline at end of file
+ pass
+
+
+class PipelineRunningError(Exception):
+ pass
+
+
+class TileProcessingError(Exception):
+ pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index a36b19a..71607c2 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1 +1,3 @@
-from .Exceptions import PipelineBuildingError
\ No newline at end of file
+from .Exceptions import TileProcessingError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f872e4d..c7b5d6a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,13 +17,15 @@
import logging
import time
from typing import List
-from granule_ingester.exceptions import PipelineBuildingError
+
import aiomultiprocess
import xarray as xr
import yaml
-from yaml.scanner import ScannerError
+from aiomultiprocess.types import ProxyException
from nexusproto import DataTile_pb2 as nexusproto
+from yaml.scanner import ScannerError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -62,6 +64,7 @@ async def _process_tile_in_worker(serialized_input_tile: str):
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+
if processed_tile:
await _worker_data_store.save_data(processed_tile)
await _worker_metadata_store.save_metadata(processed_tile)
@@ -149,7 +152,11 @@ class Pipeline:
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
- await pool.map(_process_tile_in_worker, chunk)
+ try:
+ await pool.map(_process_tile_in_worker, chunk)
+ except ProxyException:
+ pool.terminate()
+ raise PipelineRunningError("Running the pipeline failed and could not recover.")
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
import xarray as xr
from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
self.latitude = latitude
self.longitude = longitude
- # Common optional properties
- self.temp_dir = None
- self.metadata = None
- # self.temp_dir = self.environ['TEMP_DIR']
- # self.metadata = self.environ['META']
-
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
- dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+ try:
+ dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
- output_tile = nexusproto.NexusTile()
- output_tile.CopyFrom(tile)
- output_tile.summary.data_var_name = self.variable_to_read
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
- return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ except Exception:
+ raise TileProcessingError("Could not generate tiles from the granule.")
@abstractmethod
def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..a6d64a2 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,3 @@
cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
aioboto3
[incubator-sdap-ingester] 08/10: use pysolr
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit fd5cc2218ff628331bc9e07fe755a9f594bf4a5c
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 16 18:24:00 2020 -0700
use pysolr
---
granule_ingester/docker/entrypoint.sh | 1 +
.../granule_ingester/exceptions/Exceptions.py | 3 +
.../granule_ingester/exceptions/__init__.py | 1 +
granule_ingester/granule_ingester/main.py | 19 ++---
.../granule_ingester/pipeline/Pipeline.py | 4 +-
.../granule_ingester/writers/SolrStore.py | 81 ++++++++++++++--------
granule_ingester/requirements.txt | 4 +-
7 files changed, 74 insertions(+), 39 deletions(-)
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index e6f7262..3a1cb9b 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
$([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index ca60608..c648b99 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -21,6 +21,9 @@ class RabbitMQLostConnectionError(LostConnectionError):
class CassandraLostConnectionError(LostConnectionError):
pass
+class SolrLostConnectionError(LostConnectionError):
+ pass
+
class FailedHealthCheckError(Exception):
pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 31cc5b8..ea0969f 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -7,4 +7,5 @@ from .Exceptions import PipelineRunningError
from .Exceptions import RabbitMQFailedHealthCheckError
from .Exceptions import RabbitMQLostConnectionError
from .Exceptions import SolrFailedHealthCheckError
+from .Exceptions import SolrLostConnectionError
from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 45877c2..f602da8 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,16 +15,15 @@
import argparse
import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
import logging
+import sys
from functools import partial
from typing import List
from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
-import sys
+from granule_ingester.writers import CassandraStore, SolrStore
def cassandra_factory(contact_points, port):
@@ -33,8 +32,8 @@ def cassandra_factory(contact_points, port):
return store
-def solr_factory(solr_host_and_port):
- store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+ store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
store.connect()
return store
@@ -78,6 +77,8 @@ async def main(loop):
default='http://localhost:8983',
metavar='HOST:PORT',
help='Solr host and port. (Default: http://localhost:8983)')
+ parser.add_argument('--zk_host_and_port',
+ metavar="HOST:PORT")
parser.add_argument('-v',
'--verbose',
action='store_true',
@@ -99,16 +100,18 @@ async def main(loop):
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
solr_host_and_port = args.solr_host_and_port
+ zk_host_and_port = args.zk_host_and_port
consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
- metadata_store_factory=partial(solr_factory, solr_host_and_port))
+ metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
try:
+ solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
+ solr_store,
consumer])
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f1aa021..a667d5e 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -27,7 +27,7 @@ from nexusproto import DataTile_pb2 as nexusproto
from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -180,6 +180,8 @@ class Pipeline:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
+ # Give the shared memory manager some time to write the exception
+ # await asyncio.sleep(1)
raise pickle.loads(shared_memory.error)
end = time.perf_counter()
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 6baad28..926a75c 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
import logging
+from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
from typing import Dict
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
from nexusproto.DataTile_pb2 import *
-from tenacity import *
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
from granule_ingester.writers.MetadataStore import MetadataStore
-from granule_ingester.exceptions import SolrFailedHealthCheckError
+
logger = logging.getLogger(__name__)
+def run_in_executor(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ loop = asyncio.get_running_loop()
+ return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+ return inner
+
+
class SolrStore(MetadataStore):
- def __init__(self, host_and_port='http://localhost:8983'):
+ def __init__(self, solr_url=None, zk_url=None):
super().__init__()
self.TABLE_NAME = "sea_surface_temp"
self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
- self._host_and_port = host_and_port
+ self._solr_url = solr_url
+ self._zk_url = zk_url
self.geo_precision: int = 3
- self.collection: str = "nexustiles"
+ self._collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
- self._session = None
+ self._solr = None
+
+ def _get_connection(self) -> pysolr.Solr:
+ if self._zk_url:
+ zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+ collections = {}
+ for c in zk.zk.get_children("collections"):
+ collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+ zk.collections = collections
+ return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+ elif self._solr_url:
+ return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+ else:
+ raise RuntimeError("You must provide either solr_host or zookeeper_host.")
def connect(self, loop: AbstractEventLoop = None):
- self._session = aiohttp.ClientSession(loop=loop)
+ self._solr = self._get_connection()
async def health_check(self):
try:
- async with aiohttp.ClientSession() as session:
- response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
- if response.status == 200:
- return True
- else:
- logger.error("Solr health check returned status {}.".format(response.status))
- except aiohttp.ClientConnectionError as e:
- raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
-
- return False
+ connection = self._get_connection()
+ connection.ping()
+ except pysolr.SolrError:
+ raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+ except NoNodeError:
+ raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+ except KazooTimeoutError:
+ raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
async def save_metadata(self, nexus_tile: NexusTile) -> None:
solr_doc = self._build_solr_doc(nexus_tile)
+ await self._save_document(solr_doc)
- await self._save_document(self.collection, solr_doc)
-
- @retry(stop=stop_after_attempt(5))
- async def _save_document(self, collection: str, doc: dict):
- url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
- response = await self._session.post(url, json=doc)
- if response.status < 200 or response.status >= 400:
- raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+ @run_in_executor
+ def _save_document(self, doc: dict):
+ try:
+ self._solr.add([doc])
+ except pysolr.SolrError:
+ raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 0479f99..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,4 +1,6 @@
cassandra-driver==3.23.0
aiomultiprocess==0.7.0
aioboto3
-tblib==1.6.0
\ No newline at end of file
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file
[incubator-sdap-ingester] 06/10: exc handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 41755a5f1663085094c9d55de8c183748b65bb63
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 14:08:05 2020 -0700
exc handling
---
granule_ingester/granule_ingester/consumer/Consumer.py | 11 +++++++----
granule_ingester/granule_ingester/exceptions/Exceptions.py | 10 ++++++----
granule_ingester/granule_ingester/exceptions/__init__.py | 5 +++--
granule_ingester/granule_ingester/main.py | 5 ++++-
granule_ingester/granule_ingester/writers/CassandraStore.py | 4 ++--
5 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 59db4e8..d40b54c 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,8 +17,8 @@ import logging
import aio_pika
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
- RabbitMQFailedHealthCheckError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+ RabbitMQFailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -83,6 +83,9 @@ class Consumer(HealthCheck):
except PipelineRunningError as e:
await message.reject()
logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+ except LostConnectionError:
+ # Let main() handle this
+ raise
except Exception as e:
await message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
@@ -98,7 +101,7 @@ class Consumer(HealthCheck):
except aio_pika.exceptions.MessageProcessError:
# Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
# connection has died, and attempting to close the queue will only raise another exception.
- raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
+ raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
- queue_iter.close()
+ await queue_iter.close()
raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index f43bc2f..ca60608 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,11 +10,15 @@ class TileProcessingError(Exception):
pass
-class RabbitMQConnectionError(Exception):
+class LostConnectionError(Exception):
pass
-class CassandraConnectionError(Exception):
+class RabbitMQLostConnectionError(LostConnectionError):
+ pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
pass
@@ -32,5 +36,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
-
-
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 400c9bf..31cc5b8 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,9 +1,10 @@
-from .Exceptions import CassandraConnectionError
from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
-from .Exceptions import RabbitMQConnectionError
from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
from .Exceptions import SolrFailedHealthCheckError
from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 2754c7f..b9d475b 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,7 +15,7 @@
import argparse
import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
import logging
from functools import partial
from typing import List
@@ -116,6 +116,9 @@ async def main():
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
sys.exit(1)
+ except LostConnectionError as e:
+ logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+ sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index fbb5a7d..791911e 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -24,7 +24,7 @@ from cassandra.cqlengine.models import Model
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from nexusproto.DataTile_pb2 import NexusTile, TileData
-from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -77,7 +77,7 @@ class CassandraStore(DataStore):
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
except Exception:
- raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
+ raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
[incubator-sdap-ingester] 05/10: propagate child worker exceptions
up to main process
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit de8c5a00a7d8589f072ac5865cdde102864ae298
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 13:52:50 2020 -0700
propagate child worker exceptions up to main process
---
.../services/MessagePublisher.py | 2 +-
.../granule_ingester/exceptions/Exceptions.py | 6 +++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 53 ++++++++++++++--------
.../granule_ingester/writers/CassandraStore.py | 17 ++++---
granule_ingester/requirements.txt | 1 +
6 files changed, 53 insertions(+), 27 deletions(-)
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..f7a5517 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -27,7 +27,7 @@ class MessagePublisher:
:return: None
"""
properties = pika.BasicProperties(content_type='text/plain',
- delivery_mode=1,
+ delivery_mode=2,
priority=priority)
self._channel.basic_publish(exchange='',
routing_key=self._queue,
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 6e7d89a..f43bc2f 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -14,6 +14,10 @@ class RabbitMQConnectionError(Exception):
pass
+class CassandraConnectionError(Exception):
+ pass
+
+
class FailedHealthCheckError(Exception):
pass
@@ -28,3 +32,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
+
+
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 2ba1b4a..400c9bf 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
+from .Exceptions import CassandraConnectionError
from .Exceptions import CassandraFailedHealthCheckError
from .Exceptions import FailedHealthCheckError
from .Exceptions import PipelineBuildingError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e52d99f..14dc032 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
+import pickle
import time
+from multiprocessing import Manager
from typing import List
import aiomultiprocess
import xarray as xr
import yaml
from aiomultiprocess.types import ProxyException
-from cassandra.cluster import NoHostAvailable
from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -41,13 +42,15 @@ _worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
_worker_processor_list: List[TileProcessor] = None
_worker_dataset = None
+_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
global _worker_data_store
global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
+ global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
# however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
+ _shared_memory = shared_memory
async def _process_tile_in_worker(serialized_input_tile: str):
- global _worker_data_store
- global _worker_metadata_store
- global _worker_processor_list
- global _worker_dataset
+ try:
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-
- if processed_tile:
- # try:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
- # except NoHostAvailable as e:
- # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+ except Exception as e:
+ pickling_support.install(e)
+ _shared_memory.error = pickle.dumps(e)
+ raise
def _recurse(processor_list: List[TileProcessor],
@@ -96,10 +97,15 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ # Create a SyncManager Namespace so that we can to communicate exceptions from the
+ # worker processes back to the main process.
+ self._shared_memory = Manager().Namespace()
+
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
try:
config = yaml.load(config_str, yaml.FullLoader)
+ cls._validate_config(config)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
@@ -108,6 +114,12 @@ class Pipeline:
except yaml.scanner.ScannerError:
raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+ # TODO: this method should validate the config against an actual schema definition
+ @staticmethod
+ def _validate_config(config: dict):
+ if type(config) is not dict:
+ raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
@@ -150,17 +162,18 @@ class Pipeline:
initargs=(self._tile_processors,
dataset,
self._data_store_factory,
- self._metadata_store_factory)) as pool:
+ self._metadata_store_factory,
+ self._shared_memory)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
try:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
- raise PipelineRunningError("Running the pipeline failed and could not recover.")
+ raise pickle.loads(self._shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 530871d..fbb5a7d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -21,9 +21,10 @@ import uuid
from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from nexusproto.DataTile_pb2 import NexusTile, TileData
-from granule_ingester.exceptions import CassandraFailedHealthCheckError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -52,7 +53,11 @@ class CassandraStore(DataStore):
raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
- cluster = Cluster(contact_points=self._contact_points, port=self._port)
+ cluster = Cluster(contact_points=self._contact_points,
+ port=self._port,
+ # load_balancing_policy=
+ reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+ default_retry_policy=RetryPolicy())
session = cluster.connect()
session.set_keyspace('nexustiles')
return session
@@ -69,10 +74,10 @@ class CassandraStore(DataStore):
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query,
- [tile_id, bytearray(serialized_tile_data)])
- except NoHostAvailable as e:
- logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
+ await self._execute_query_async(self._session, prepared_query,
+ [tile_id, bytearray(serialized_tile_data)])
+ except Exception:
+ raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index a6d64a2..0479f99 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,4 @@
cassandra-driver==3.23.0
aiomultiprocess==0.7.0
aioboto3
+tblib==1.6.0
\ No newline at end of file
[incubator-sdap-ingester] 01/10: better error handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 6efb623692fe6395518c3bac83d45c8bef9556fc
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500
better error handling
---
.../granule_ingester/consumer/Consumer.py | 7 +++-
.../granule_ingester/exceptions/Exceptions.py | 2 ++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 39 +++++++++++-----------
granule_ingester/tests/pipeline/test_Pipeline.py | 9 ++---
5 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
logger = logging.getLogger(__name__)
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
metadata_store_factory=metadata_store_factory)
await pipeline.run()
message.ack()
+ except PipelineBuildingError as e:
+ message.reject()
+ logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+ f"from RabbitMQ. The exception was:\n{e}")
except Exception as e:
message.reject(requeue=True)
- logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+ logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self):
channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..4c03e48
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,2 @@
+class PipelineBuildingError(Exception):
+ pass
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..a36b19a
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import PipelineBuildingError
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
import logging
import time
from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
import aiomultiprocess
import xarray as xr
import yaml
+from yaml.scanner import ScannerError
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
- config = yaml.load(config_str, yaml.FullLoader)
- return cls._build_pipeline(config,
- data_store_factory,
- metadata_store_factory,
- processor_module_mappings)
-
- @classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
- with open(config_path) as config_file:
- config = yaml.load(config_file, yaml.FullLoader)
+ try:
+ config = yaml.load(config_str, yaml.FullLoader)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
processor_module_mappings)
+ except yaml.scanner.ScannerError:
+ raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
data_store_factory,
metadata_store_factory,
module_mappings: dict):
- granule_loader = GranuleLoader(**config['granule'])
+ try:
+ granule_loader = GranuleLoader(**config['granule'])
- slicer_config = config['slicer']
- slicer = cls._parse_module(slicer_config, module_mappings)
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
- tile_processors = []
- for processor_config in config['processors']:
- module = cls._parse_module(processor_config, module_mappings)
- tile_processors.append(module)
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ except KeyError as e:
+ raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+ except Exception:
+ raise PipelineBuildingError("Cannot build pipeline.")
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
pass
relative_path = "../config_files/ingestion_config_testfile.yaml"
- file_path = os.path.join(os.path.dirname(__file__), relative_path)
- pipeline = Pipeline.from_file(config_path=str(file_path),
- data_store_factory=MockDataStore,
- metadata_store_factory=MockMetadataStore)
+ with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+ yaml_str = file.read()
+ pipeline = Pipeline.from_string(config_str=yaml_str,
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
self.assertEqual(pipeline._data_store_factory, MockDataStore)
self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)
[incubator-sdap-ingester] 07/10: error handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 385f5b83a169bf723608512fb98045b53f8dd5f2
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 17:36:08 2020 -0700
error handling
---
granule_ingester/granule_ingester/main.py | 8 ++---
.../granule_ingester/pipeline/Pipeline.py | 35 +++++++++++++---------
.../granule_ingester/writers/CassandraStore.py | 2 +-
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index b9d475b..45877c2 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
return True
-async def main():
+async def main(loop):
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
'and ingest a granule for each message that comes through.')
parser.add_argument('--rabbitmq_host',
@@ -115,14 +115,14 @@ async def main():
await consumer.start_consuming()
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
- sys.exit(1)
except LostConnectionError as e:
logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
- sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+ finally:
sys.exit(1)
if __name__ == '__main__':
- asyncio.run(main())
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 14dc032..f1aa021 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -19,15 +19,15 @@ import time
from multiprocessing import Manager
from typing import List
-import aiomultiprocess
import xarray as xr
import yaml
+from aiomultiprocess import Pool
from aiomultiprocess.types import ProxyException
from nexusproto import DataTile_pb2 as nexusproto
from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError
+from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore
logger = logging.getLogger(__name__)
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
_worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
@@ -97,9 +99,12 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
- # Create a SyncManager Namespace so that we can to communicate exceptions from the
+ # Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
- self._shared_memory = Manager().Namespace()
+ self._manager = Manager()
+
+ def __del__(self):
+ self._manager.shutdown()
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
@@ -158,26 +163,28 @@ class Pipeline:
async def run(self):
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
- async with aiomultiprocess.Pool(initializer=_init_worker,
- initargs=(self._tile_processors,
- dataset,
- self._data_store_factory,
- self._metadata_store_factory,
- self._shared_memory)) as pool:
+
+ shared_memory = self._manager.Namespace()
+ async with Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+ self._metadata_store_factory,
+ shared_memory)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
try:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
- raise pickle.loads(self._shared_memory.error)
+ raise pickle.loads(shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
@staticmethod
- def _chunk_list(items, chunk_size):
+ def _chunk_list(items, chunk_size: int):
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 791911e..6b2cf32 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -76,7 +76,7 @@ class CassandraStore(DataStore):
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
- except Exception:
+ except NoHostAvailable:
raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
@staticmethod
[incubator-sdap-ingester] 10/10: use asyncio in the collection
ingester
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 252500173ead6154a9b5110ee84604b080be742d
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 28 15:09:03 2020 -0700
use asyncio in the collection ingester
---
collection_manager/collection_manager/main.py | 34 +++++++++---------
.../services/CollectionProcessor.py | 8 ++---
.../services/CollectionWatcher.py | 34 ++++++++++--------
.../services/MessagePublisher.py | 41 +++++++++++-----------
collection_manager/requirements.txt | 2 ++
5 files changed, 63 insertions(+), 56 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 43b687e..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,25 +63,23 @@ async def main():
history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
else:
history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
- publisher = MessagePublisher(host=options.rabbitmq_host,
- username=options.rabbitmq_username,
- password=options.rabbitmq_password,
- queue=options.rabbitmq_queue)
- publisher.connect()
- collection_processor = CollectionProcessor(message_publisher=publisher,
- history_manager_builder=history_manager_builder)
- collection_watcher = CollectionWatcher(collections_path=options.collections_path,
- collection_updated_callback=collection_processor.process_collection,
- granule_updated_callback=collection_processor.process_granule,
- collections_refresh_interval=int(options.refresh))
+ async with MessagePublisher(host=options.rabbitmq_host,
+ username=options.rabbitmq_username,
+ password=options.rabbitmq_password,
+ queue=options.rabbitmq_queue) as publisher:
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+ history_manager_builder=history_manager_builder)
+ collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+ collection_updated_callback=collection_processor.process_collection,
+ granule_updated_callback=collection_processor.process_granule,
+ collections_refresh_interval=int(options.refresh))
- collection_watcher.start_watching()
-
- while True:
- try:
- await asyncio.sleep(1)
- except KeyboardInterrupt:
- return
+ await collection_watcher.start_watching()
+ while True:
+ try:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ return
except Exception as e:
logger.exception(e)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
self._config_template = config_template_file.read()
- def process_collection(self, collection: Collection):
+ async def process_collection(self, collection: Collection):
"""
Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
:param collection: A Collection definition
:return: None
"""
for granule in collection.files_owned():
- self.process_granule(granule, collection)
+ await self.process_granule(granule, collection)
- def process_granule(self, granule: str, collection: Collection):
+ async def process_granule(self, granule: str, collection: Collection):
"""
Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
:param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
- self._publisher.publish_message(body=dataset_config, priority=use_priority)
+ await self._publisher.publish_message(body=dataset_config, priority=use_priority)
history_manager.push(granule)
@staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..0d7da84 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,6 @@ import asyncio
import logging
import os
from collections import defaultdict
-from functools import partial
from typing import Dict, Callable, Set, Optional
import yaml
@@ -38,7 +37,7 @@ class CollectionWatcher:
self._granule_watches = set()
- def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+ async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
"""
Periodically load the Collections Configuration file to check for changes,
and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +45,7 @@ class CollectionWatcher:
:return: None
"""
- self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+ await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
self._observer.start()
def collections(self) -> Set[Collection]:
@@ -99,11 +98,11 @@ class CollectionWatcher:
self._load_collections()
return self.collections() - old_collections
- def _reload_and_reschedule(self):
+ async def _reload_and_reschedule(self):
try:
updated_collections = self._get_updated_collections()
for collection in updated_collections:
- self._collection_updated_callback(collection)
+ await self._collection_updated_callback(collection)
if len(updated_collections) > 0:
self._unschedule_watches()
self._schedule_watches()
@@ -117,7 +116,9 @@ class CollectionWatcher:
def _schedule_watches(self):
for directory, collections in self._collections_by_dir.items():
- granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+ granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+ self._granule_updated_callback,
+ collections)
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
try:
@@ -127,18 +128,22 @@ class CollectionWatcher:
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
@classmethod
- def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+ async def _run_periodically(cls,
+ loop: Optional[asyncio.AbstractEventLoop],
+ wait_time: float,
+ coro,
+ *args):
"""
Call a function periodically. This uses asyncio, and is non-blocking.
:param loop: An optional event loop to use. If None, the current running event loop will be used.
:param wait_time: seconds to wait between iterations of func
- :param func: the function that will be run
+ :param coro: the coroutine that will be awaited
:param args: any args that need to be provided to func
"""
if loop is None:
loop = asyncio.get_running_loop()
- func(*args)
- loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+ await coro(*args)
+ loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,15 +151,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
EventHandler that watches for new or modified granule files.
"""
- def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
- self._callback = callback
+ def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
+ self._loop = loop
+ self._callback_coro = callback_coro
self._collections_for_dir = collections_for_dir
def on_created(self, event):
super().on_created(event)
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback_coro(event.src_path, collection))
def on_modified(self, event):
super().on_modified(event)
@@ -163,4 +169,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback_coro(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index f7a5517..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
def __init__(self, host: str, username: str, password: str, queue: str):
self._connection_string = f"amqp://{username}:{password}@{host}/"
self._queue = queue
- self._channel = None
- self._connection = None
+ self._channel: Channel = None
+ self._connection: Connection = None
- def connect(self):
+ async def __aenter__(self):
+ await self._connect()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._connection:
+ await self._connection.close()
+
+ async def _connect(self):
"""
Establish a connection to RabbitMQ.
:return: None
"""
- parameters = pika.URLParameters(self._connection_string)
- self._connection = pika.BlockingConnection(parameters)
- self._channel = self._connection.channel()
- self._channel.queue_declare(self._queue, durable=True)
+ self._connection = await connect_robust(self._connection_string)
+ self._channel = await self._connection.channel()
+ await self._channel.declare_queue(self._queue, durable=True)
- def publish_message(self, body: str, priority: int = None):
+ @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+ async def publish_message(self, body: str, priority: int = None):
"""
Publish a message to RabbitMQ using the optional message priority.
:param body: A string to publish to RabbitMQ
:param priority: An optional integer priority to use for the message
:return: None
"""
- properties = pika.BasicProperties(content_type='text/plain',
- delivery_mode=2,
- priority=priority)
- self._channel.basic_publish(exchange='',
- routing_key=self._queue,
- body=body,
- properties=properties)
-
- def __del__(self):
- if self._connection:
- self._connection.close()
+ message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+ await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..47ae867 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,5 @@ pysolr==3.8.1
pika==1.1.0
watchdog==0.10.2
requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file