You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/04/09 01:17:30 UTC
[incubator-sdap-ingester] 11/33: SDAP-271 Cassandra authentication
support (#11)
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch ascending_latitudes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a9a0629405199049a822e938f46d43b575448629
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 15:16:40 2020 -0700
SDAP-271 Cassandra authentication support (#11)
---
granule_ingester/docker/entrypoint.sh | 2 ++
.../granule_ingester/exceptions/Exceptions.py | 41 ++++++++++++++++++++++
.../granule_ingester/exceptions/__init__.py | 11 ++++++
granule_ingester/granule_ingester/main.py | 30 ++++++++++++----
.../granule_ingester/writers/CassandraStore.py | 39 ++++++++++++++------
5 files changed, 106 insertions(+), 17 deletions(-)
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index e6f7262..b703ee3 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,4 +7,6 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
$([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
+ $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..c648b99
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,41 @@
+class PipelineBuildingError(Exception):
+ pass
+
+
+class PipelineRunningError(Exception):
+ pass
+
+
+class TileProcessingError(Exception):
+ pass
+
+
+class LostConnectionError(Exception):
+ pass
+
+
+class RabbitMQLostConnectionError(LostConnectionError):
+ pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
+ pass
+
+class SolrLostConnectionError(LostConnectionError):
+ pass
+
+
+class FailedHealthCheckError(Exception):
+ pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
+ pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..ea0969f
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1,11 @@
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
+from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
+from .Exceptions import SolrFailedHealthCheckError
+from .Exceptions import SolrLostConnectionError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..9010e33 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -25,8 +25,8 @@ from granule_ingester.writers import CassandraStore
from granule_ingester.writers import SolrStore
-def cassandra_factory(contact_points, port):
- store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+ store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
store.connect()
return store
@@ -72,6 +72,14 @@ async def main():
default=9042,
metavar="PORT",
help='Cassandra port. (Default: 9042)')
+ parser.add_argument('--cassandra_username',
+ metavar="USERNAME",
+ default=None,
+ help='Cassandra username. Optional.')
+ parser.add_argument('--cassandra_password',
+ metavar="PASSWORD",
+ default=None,
+ help='Cassandra password. Optional.')
parser.add_argument('--solr_host_and_port',
default='http://localhost:8983',
metavar='HOST:PORT',
@@ -94,6 +102,8 @@ async def main():
config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)])
logger.info("Using configuration values:\n{}".format(config_values_str))
+ cassandra_username = args.cassandra_username
+ cassandra_password = args.cassandra_password
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
solr_host_and_port = args.solr_host_and_port
@@ -102,12 +112,18 @@ async def main():
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
- data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+ data_store_factory=partial(cassandra_factory,
+ cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
metadata_store_factory=partial(solr_factory, solr_host_and_port))
- if await run_health_checks(
- [CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
- consumer]):
+ if await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
+ SolrStore(solr_host_and_port),
+ consumer]):
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
await consumer.start_consuming()
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,11 +18,14 @@ import asyncio
import logging
import uuid
-from cassandra.cluster import Cluster, Session
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from nexusproto.DataTile_pb2 import NexusTile, TileData
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -37,8 +40,10 @@ class TileModel(Model):
class CassandraStore(DataStore):
- def __init__(self, contact_points=None, port=9042):
+ def __init__(self, contact_points=None, port=9042, username=None, password=None):
self._contact_points = contact_points
+ self._username = username
+ self._password = password
self._port = port
self._session = None
@@ -47,12 +52,22 @@ class CassandraStore(DataStore):
session = self._get_session()
session.shutdown()
return True
- except:
- logger.error("Cannot connect to Cassandra!")
- return False
+ except Exception:
+ raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
- cluster = Cluster(contact_points=self._contact_points, port=self._port)
+
+ if self._username and self._password:
+ auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+ else:
+ auth_provider = None
+
+ cluster = Cluster(contact_points=self._contact_points,
+ port=self._port,
+ # load_balancing_policy=
+ reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+ default_retry_policy=RetryPolicy(),
+ auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace('nexustiles')
return session
@@ -65,10 +80,14 @@ class CassandraStore(DataStore):
self._session.shutdown()
async def save_data(self, tile: NexusTile) -> None:
- tile_id = uuid.UUID(tile.summary.tile_id)
- serialized_tile_data = TileData.SerializeToString(tile.tile)
- prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ try:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+ await self._execute_query_async(self._session, prepared_query,
+ [tile_id, bytearray(serialized_tile_data)])
+ except NoHostAvailable:
+ raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):