You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 19:47:50 UTC

[incubator-sdap-ingester] 01/01: add cassandra auth and zookeeper support

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch cassandra-auth
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit ffad48fc868fc78520f8dcc369f23e64cac5b7c5
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 12:47:31 2020 -0700

    add cassandra auth and zookeeper support
---
 granule_ingester/docker/entrypoint.sh              |  5 +-
 .../granule_ingester/exceptions/Exceptions.py      | 41 +++++++++++
 .../granule_ingester/exceptions/__init__.py        | 11 +++
 granule_ingester/granule_ingester/main.py          | 61 ++++++++++++-----
 .../granule_ingester/writers/CassandraStore.py     | 39 ++++++++---
 .../granule_ingester/writers/SolrStore.py          | 79 ++++++++++++++--------
 granule_ingester/requirements.txt                  |  2 +
 7 files changed, 183 insertions(+), 55 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index e6f7262..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,4 +7,7 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
   $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..c648b99
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,41 @@
+class PipelineBuildingError(Exception):
+    pass
+
+
+class PipelineRunningError(Exception):
+    pass
+
+
+class TileProcessingError(Exception):
+    pass
+
+
+class LostConnectionError(Exception):
+    pass
+
+
+class RabbitMQLostConnectionError(LostConnectionError):
+    pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
+    pass
+
+class SolrLostConnectionError(LostConnectionError):
+    pass
+
+
+class FailedHealthCheckError(Exception):
+    pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
+    pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..ea0969f
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1,11 @@
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
+from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
+from .Exceptions import SolrFailedHealthCheckError
+from .Exceptions import SolrLostConnectionError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -16,23 +16,24 @@
 import argparse
 import asyncio
 import logging
+import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
+from granule_ingester.writers import CassandraStore, SolrStore
 
 
-def cassandra_factory(contact_points, port):
-    store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+    store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
     store.connect()
     return store
 
 
-def solr_factory(solr_host_and_port):
-    store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
     store.connect()
     return store
 
@@ -44,7 +45,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -72,10 +73,20 @@ async def main():
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--cassandra_username',
+                        metavar="USERNAME",
+                        default=None,
+                        help='Cassandra username. Optional.')
+    parser.add_argument('--cassandra_password',
+                        metavar="PASSWORD",
+                        default=None,
+                        help='Cassandra password. Optional.')
     parser.add_argument('--solr_host_and_port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--zk_host_and_port',
+                        metavar="HOST:PORT")
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -96,24 +107,42 @@ async def main():
 
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
+    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
                         rabbitmq_password=args.rabbitmq_password,
                         rabbitmq_queue=args.rabbitmq_queue,
-                        data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
-                        metadata_store_factory=partial(solr_factory, solr_host_and_port))
-    if await run_health_checks(
-            [CassandraStore(cassandra_contact_points, cassandra_port),
-             SolrStore(solr_host_and_port),
-             consumer]):
+                        data_store_factory=partial(cassandra_factory,
+                                                   cassandra_contact_points,
+                                                   cassandra_port,
+                                                   cassandra_username,
+                                                   cassandra_password),
+                        metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+    try:
+        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+        await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                cassandra_port,
+                                                cassandra_username,
+                                                cassandra_password),
+                                 solr_store,
+                                 consumer])
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
             await consumer.start_consuming()
-    else:
-        logger.error("Quitting because not all dependencies passed the health checks.")
+    except FailedHealthCheckError as e:
+        logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+    except LostConnectionError as e:
+        logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+    except Exception as e:
+        logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+    finally:
+        sys.exit(1)
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,11 +18,14 @@ import asyncio
 import logging
 import uuid
 
-from cassandra.cluster import Cluster, Session
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
 from granule_ingester.writers.DataStore import DataStore
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -37,8 +40,10 @@ class TileModel(Model):
 
 
 class CassandraStore(DataStore):
-    def __init__(self, contact_points=None, port=9042):
+    def __init__(self, contact_points=None, port=9042, username=None, password=None):
         self._contact_points = contact_points
+        self._username = username
+        self._password = password
         self._port = port
         self._session = None
 
@@ -47,12 +52,22 @@ class CassandraStore(DataStore):
             session = self._get_session()
             session.shutdown()
             return True
-        except:
-            logger.error("Cannot connect to Cassandra!")
-            return False
+        except Exception:
+            raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
 
     def _get_session(self) -> Session:
-        cluster = Cluster(contact_points=self._contact_points, port=self._port)
+
+        if self._username and self._password:
+            auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+        else:
+            auth_provider = None
+
+        cluster = Cluster(contact_points=self._contact_points,
+                          port=self._port,
+                          # load_balancing_policy=
+                          reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+                          default_retry_policy=RetryPolicy(),
+                          auth_provider=auth_provider)
         session = cluster.connect()
         session.set_keyspace('nexustiles')
         return session
@@ -65,10 +80,14 @@ class CassandraStore(DataStore):
             self._session.shutdown()
 
     async def save_data(self, tile: NexusTile) -> None:
-        tile_id = uuid.UUID(tile.summary.tile_id)
-        serialized_tile_data = TileData.SerializeToString(tile.tile)
-        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
-        await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+        try:
+            tile_id = uuid.UUID(tile.summary.tile_id)
+            serialized_tile_data = TileData.SerializeToString(tile.tile)
+            prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+            await self._execute_query_async(self._session, prepared_query,
+                                            [tile_id, bytearray(serialized_tile_data)])
+        except NoHostAvailable:
+            raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
 import logging
+from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
 from nexusproto.DataTile_pb2 import *
-from tenacity import *
 
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
 
 logger = logging.getLogger(__name__)
 
 
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
+
+
 class SolrStore(MetadataStore):
-    def __init__(self, host_and_port='http://localhost:8983'):
+    def __init__(self, solr_url=None, zk_url=None):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._host_and_port = host_and_port
+        self._solr_url = solr_url
+        self._zk_url = zk_url
         self.geo_precision: int = 3
-        self.collection: str = "nexustiles"
+        self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._session = None
+        self._solr = None
+
+    def _get_connection(self) -> pysolr.Solr:
+        if self._zk_url:
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
+            collections = {}
+            for c in zk.zk.get_children("collections"):
+                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+            zk.collections = collections
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+        elif self._solr_url:
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+        else:
+            raise RuntimeError("You must provide either solr_host or zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._session = aiohttp.ClientSession(loop=loop)
+        self._solr = self._get_connection()
 
     async def health_check(self):
         try:
-            async with aiohttp.ClientSession() as session:
-                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
-                if response.status == 200:
-                    return True
-                else:
-                    logger.error("Solr health check returned status {}.".format(response.status))
-        except aiohttp.ClientConnectionError as e:
-            logger.error("Cannot connect to Solr!")
-
-        return False
+            connection = self._get_connection()
+            connection.ping()
+        except pysolr.SolrError:
+            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+        except NoNodeError:
+            raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+        except KazooTimeoutError:
+            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
+        await self._save_document(solr_doc)
 
-        await self._save_document(self.collection, solr_doc)
-
-    @retry(stop=stop_after_attempt(5))
-    async def _save_document(self, collection: str, doc: dict):
-        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
-        response = await self._session.post(url, json=doc)
-        if response.status < 200 or response.status >= 400:
-            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+    @run_in_executor
+    def _save_document(self, doc: dict):
+        try:
+            self._solr.add([doc])
+        except pysolr.SolrError:
+            raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..16f83bf 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,5 @@
 cassandra-driver==3.23.0
+pysolr==3.9.0
+kazoo==2.8.0
 aiomultiprocess
 aioboto3