You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 21:03:59 UTC
[incubator-sdap-ingester] branch cassandra-auth updated: Revert
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch cassandra-auth
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/cassandra-auth by this push:
new f6d2822 Revert
f6d2822 is described below
commit f6d2822275e518f0e25d1867ff7be5b3fc7fd363
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 14:03:42 2020 -0700
Revert
---
granule_ingester/granule_ingester/main.py | 47 +++++--------
.../granule_ingester/writers/SolrStore.py | 79 ++++++++--------------
granule_ingester/requirements.txt | 1 -
3 files changed, 45 insertions(+), 82 deletions(-)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 8b8d40f..9010e33 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -16,14 +16,13 @@
import argparse
import asyncio
import logging
-import sys
from functools import partial
from typing import List
from granule_ingester.consumer import Consumer
-from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore, SolrStore
+from granule_ingester.writers import CassandraStore
+from granule_ingester.writers import SolrStore
def cassandra_factory(contact_points, port, username, password):
@@ -32,8 +31,8 @@ def cassandra_factory(contact_points, port, username, password):
return store
-def solr_factory(solr_host_and_port, zk_host_and_port):
- store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+def solr_factory(solr_host_and_port):
+ store = SolrStore(solr_host_and_port)
store.connect()
return store
@@ -45,7 +44,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
return True
-async def main(loop):
+async def main():
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
'and ingest a granule for each message that comes through.')
parser.add_argument('--rabbitmq_host',
@@ -85,8 +84,6 @@ async def main(loop):
default='http://localhost:8983',
metavar='HOST:PORT',
help='Solr host and port. (Default: http://localhost:8983)')
- parser.add_argument('--zk_host_and_port',
- metavar="HOST:PORT")
parser.add_argument('-v',
'--verbose',
action='store_true',
@@ -105,12 +102,11 @@ async def main(loop):
config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)])
logger.info("Using configuration values:\n{}".format(config_values_str))
- cassandra_contact_points = args.cassandra_contact_points
- cassandra_port = args.cassandra_port
cassandra_username = args.cassandra_username
cassandra_password = args.cassandra_password
+ cassandra_contact_points = args.cassandra_contact_points
+ cassandra_port = args.cassandra_port
solr_host_and_port = args.solr_host_and_port
- zk_host_and_port = args.zk_host_and_port
consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
rabbitmq_username=args.rabbitmq_username,
@@ -121,28 +117,19 @@ async def main(loop):
cassandra_port,
cassandra_username,
cassandra_password),
- metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
- try:
- solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
- await run_health_checks([CassandraStore(cassandra_contact_points,
- cassandra_port,
- cassandra_username,
- cassandra_password),
- solr_store,
- consumer])
+ metadata_store_factory=partial(solr_factory, solr_host_and_port))
+ if await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
+ SolrStore(solr_host_and_port),
+ consumer]):
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
await consumer.start_consuming()
- except FailedHealthCheckError as e:
- logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
- except LostConnectionError as e:
- logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
- except Exception as e:
- logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
- finally:
- sys.exit(1)
+ else:
+ logger.error("Quitting because not all dependencies passed the health checks.")
if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main(loop))
+ asyncio.run(main())
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 276a988..9d6a7f0 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,87 +13,64 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import asyncio
-import functools
-import json
-import logging
+
from asyncio import AbstractEventLoop
+
+import logging
from datetime import datetime
from pathlib import Path
from typing import Dict
-import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
-from kazoo.exceptions import NoNodeError
+import aiohttp
from nexusproto.DataTile_pb2 import *
+from tenacity import *
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
from granule_ingester.writers.MetadataStore import MetadataStore
logger = logging.getLogger(__name__)
-def run_in_executor(f):
- @functools.wraps(f)
- def inner(*args, **kwargs):
- loop = asyncio.get_running_loop()
- return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
- return inner
-
-
class SolrStore(MetadataStore):
- def __init__(self, solr_url=None, zk_url=None):
+ def __init__(self, host_and_port='http://localhost:8983'):
super().__init__()
self.TABLE_NAME = "sea_surface_temp"
self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
- self._solr_url = solr_url
- self._zk_url = zk_url
+ self._host_and_port = host_and_port
self.geo_precision: int = 3
- self._collection: str = "nexustiles"
+ self.collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
- self._solr = None
-
- def _get_connection(self) -> pysolr.Solr:
- if self._zk_url:
- zk = pysolr.ZooKeeper(f"{self._zk_url}")
- collections = {}
- for c in zk.zk.get_children("collections"):
- collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
- zk.collections = collections
- return pysolr.SolrCloud(zk, self._collection, always_commit=True)
- elif self._solr_url:
- return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
- else:
- raise RuntimeError("You must provide either solr_host or zookeeper_host.")
+ self._session = None
def connect(self, loop: AbstractEventLoop = None):
- self._solr = self._get_connection()
+ self._session = aiohttp.ClientSession(loop=loop)
async def health_check(self):
try:
- connection = self._get_connection()
- connection.ping()
- except pysolr.SolrError:
- raise SolrFailedHealthCheckError("Cannot connect to Solr!")
- except NoNodeError:
- raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
- except KazooTimeoutError:
- raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
+ async with aiohttp.ClientSession() as session:
+ response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
+ if response.status == 200:
+ return True
+ else:
+ logger.error("Solr health check returned status {}.".format(response.status))
+ except aiohttp.ClientConnectionError as e:
+ logger.error("Cannot connect to Solr!")
+
+ return False
async def save_metadata(self, nexus_tile: NexusTile) -> None:
solr_doc = self._build_solr_doc(nexus_tile)
- await self._save_document(solr_doc)
- @run_in_executor
- def _save_document(self, doc: dict):
- try:
- self._solr.add([doc])
- except pysolr.SolrError:
- raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
+ await self._save_document(self.collection, solr_doc)
+
+ @retry(stop=stop_after_attempt(5))
+ async def _save_document(self, collection: str, doc: dict):
+ url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
+ response = await self._session.post(url, json=doc)
+ if response.status < 200 or response.status >= 400:
+ raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 16f83bf..ff1be14 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,5 +1,4 @@
cassandra-driver==3.23.0
-pysolr==3.9.0
kazoo==2.8.0
aiomultiprocess
aioboto3