You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 21:03:59 UTC

[incubator-sdap-ingester] branch cassandra-auth updated: Revert

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch cassandra-auth
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/cassandra-auth by this push:
     new f6d2822  Revert
f6d2822 is described below

commit f6d2822275e518f0e25d1867ff7be5b3fc7fd363
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 14:03:42 2020 -0700

    Revert
---
 granule_ingester/granule_ingester/main.py          | 47 +++++--------
 .../granule_ingester/writers/SolrStore.py          | 79 ++++++++--------------
 granule_ingester/requirements.txt                  |  1 -
 3 files changed, 45 insertions(+), 82 deletions(-)

diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 8b8d40f..9010e33 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -16,14 +16,13 @@
 import argparse
 import asyncio
 import logging
-import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
-from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore, SolrStore
+from granule_ingester.writers import CassandraStore
+from granule_ingester.writers import SolrStore
 
 
 def cassandra_factory(contact_points, port, username, password):
@@ -32,8 +31,8 @@ def cassandra_factory(contact_points, port, username, password):
     return store
 
 
-def solr_factory(solr_host_and_port, zk_host_and_port):
-    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+def solr_factory(solr_host_and_port):
+    store = SolrStore(solr_host_and_port)
     store.connect()
     return store
 
@@ -45,7 +44,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main(loop):
+async def main():
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -85,8 +84,6 @@ async def main(loop):
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
-    parser.add_argument('--zk_host_and_port',
-                        metavar="HOST:PORT")
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -105,12 +102,11 @@ async def main(loop):
     config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)])
     logger.info("Using configuration values:\n{}".format(config_values_str))
 
-    cassandra_contact_points = args.cassandra_contact_points
-    cassandra_port = args.cassandra_port
     cassandra_username = args.cassandra_username
     cassandra_password = args.cassandra_password
+    cassandra_contact_points = args.cassandra_contact_points
+    cassandra_port = args.cassandra_port
     solr_host_and_port = args.solr_host_and_port
-    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
@@ -121,28 +117,19 @@ async def main(loop):
                                                    cassandra_port,
                                                    cassandra_username,
                                                    cassandra_password),
-                        metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
-    try:
-        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
-        await run_health_checks([CassandraStore(cassandra_contact_points,
-                                                cassandra_port,
-                                                cassandra_username,
-                                                cassandra_password),
-                                 solr_store,
-                                 consumer])
+                        metadata_store_factory=partial(solr_factory, solr_host_and_port))
+    if await run_health_checks([CassandraStore(cassandra_contact_points,
+                                               cassandra_port,
+                                               cassandra_username,
+                                               cassandra_password),
+                                SolrStore(solr_host_and_port),
+                                consumer]):
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
             await consumer.start_consuming()
-    except FailedHealthCheckError as e:
-        logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
-    except LostConnectionError as e:
-        logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
-    except Exception as e:
-        logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
-    finally:
-        sys.exit(1)
+    else:
+        logger.error("Quitting because not all dependencies passed the health checks.")
 
 
 if __name__ == '__main__':
-    loop = asyncio.get_event_loop()
-    loop.run_until_complete(main(loop))
+    asyncio.run(main())
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 276a988..9d6a7f0 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,87 +13,64 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import asyncio
-import functools
-import json
-import logging
+
 from asyncio import AbstractEventLoop
+
+import logging
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
-from kazoo.exceptions import NoNodeError
+import aiohttp
 from nexusproto.DataTile_pb2 import *
+from tenacity import *
 
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
 
 logger = logging.getLogger(__name__)
 
 
-def run_in_executor(f):
-    @functools.wraps(f)
-    def inner(*args, **kwargs):
-        loop = asyncio.get_running_loop()
-        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
-    return inner
-
-
 class SolrStore(MetadataStore):
-    def __init__(self, solr_url=None, zk_url=None):
+    def __init__(self, host_and_port='http://localhost:8983'):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._solr_url = solr_url
-        self._zk_url = zk_url
+        self._host_and_port = host_and_port
         self.geo_precision: int = 3
-        self._collection: str = "nexustiles"
+        self.collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._solr = None
-
-    def _get_connection(self) -> pysolr.Solr:
-        if self._zk_url:
-            zk = pysolr.ZooKeeper(f"{self._zk_url}")
-            collections = {}
-            for c in zk.zk.get_children("collections"):
-                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
-            zk.collections = collections
-            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
-        elif self._solr_url:
-            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
-        else:
-            raise RuntimeError("You must provide either solr_host or zookeeper_host.")
+        self._session = None
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._solr = self._get_connection()
+        self._session = aiohttp.ClientSession(loop=loop)
 
     async def health_check(self):
         try:
-            connection = self._get_connection()
-            connection.ping()
-        except pysolr.SolrError:
-            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
-        except NoNodeError:
-            raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
-        except KazooTimeoutError:
-            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
+            async with aiohttp.ClientSession() as session:
+                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
+                if response.status == 200:
+                    return True
+                else:
+                    logger.error("Solr health check returned status {}.".format(response.status))
+        except aiohttp.ClientConnectionError as e:
+            logger.error("Cannot connect to Solr!")
+
+        return False
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
-        await self._save_document(solr_doc)
 
-    @run_in_executor
-    def _save_document(self, doc: dict):
-        try:
-            self._solr.add([doc])
-        except pysolr.SolrError:
-            raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
+        await self._save_document(self.collection, solr_doc)
+
+    @retry(stop=stop_after_attempt(5))
+    async def _save_document(self, collection: str, doc: dict):
+        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
+        response = await self._session.post(url, json=doc)
+        if response.status < 200 or response.status >= 400:
+            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 16f83bf..ff1be14 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,5 +1,4 @@
 cassandra-driver==3.23.0
-pysolr==3.9.0
 kazoo==2.8.0
 aiomultiprocess
 aioboto3