You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 22:49:23 UTC
[incubator-sdap-ingester] 08/11: use pysolr
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 4b81e13601a94ff9a943bbf4c9bd5e644c651272
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 16 18:24:00 2020 -0700
use pysolr
---
granule_ingester/docker/entrypoint.sh | 1 +
granule_ingester/granule_ingester/main.py | 14 ++--
.../granule_ingester/pipeline/Pipeline.py | 4 +-
.../granule_ingester/writers/SolrStore.py | 81 ++++++++++++++--------
granule_ingester/requirements.txt | 4 +-
5 files changed, 67 insertions(+), 37 deletions(-)
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index b703ee3..b8369c6 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -10,3 +10,4 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
$([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 751da19..87a6d5a 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,16 +15,15 @@
import argparse
import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
import logging
+import sys
from functools import partial
from typing import List
from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
-import sys
+from granule_ingester.writers import CassandraStore, SolrStore
def cassandra_factory(contact_points, port, username, password):
@@ -33,8 +32,8 @@ def cassandra_factory(contact_points, port, username, password):
return store
-def solr_factory(solr_host_and_port):
- store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+ store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
store.connect()
return store
@@ -86,6 +85,8 @@ async def main(loop):
default='http://localhost:8983',
metavar='HOST:PORT',
help='Solr host and port. (Default: http://localhost:8983)')
+ parser.add_argument('--zk_host_and_port',
+ metavar="HOST:PORT")
parser.add_argument('-v',
'--verbose',
action='store_true',
@@ -109,6 +110,7 @@ async def main(loop):
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
solr_host_and_port = args.solr_host_and_port
+ zk_host_and_port = args.zk_host_and_port
consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
rabbitmq_username=args.rabbitmq_username,
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f1aa021..a667d5e 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -27,7 +27,7 @@ from nexusproto import DataTile_pb2 as nexusproto
from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -180,6 +180,8 @@ class Pipeline:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
+ # Give the shared memory manager some time to write the exception
+ # await asyncio.sleep(1)
raise pickle.loads(shared_memory.error)
end = time.perf_counter()
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 6baad28..926a75c 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
import logging
+from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
from typing import Dict
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
from nexusproto.DataTile_pb2 import *
-from tenacity import *
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
from granule_ingester.writers.MetadataStore import MetadataStore
-from granule_ingester.exceptions import SolrFailedHealthCheckError
+
logger = logging.getLogger(__name__)
+def run_in_executor(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ loop = asyncio.get_running_loop()
+ return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+ return inner
+
+
class SolrStore(MetadataStore):
- def __init__(self, host_and_port='http://localhost:8983'):
+ def __init__(self, solr_url=None, zk_url=None):
super().__init__()
self.TABLE_NAME = "sea_surface_temp"
self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
- self._host_and_port = host_and_port
+ self._solr_url = solr_url
+ self._zk_url = zk_url
self.geo_precision: int = 3
- self.collection: str = "nexustiles"
+ self._collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
- self._session = None
+ self._solr = None
+
+ def _get_connection(self) -> pysolr.Solr:
+ if self._zk_url:
+ zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+ collections = {}
+ for c in zk.zk.get_children("collections"):
+ collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+ zk.collections = collections
+ return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+ elif self._solr_url:
+ return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+ else:
+ raise RuntimeError("You must provide either solr_host or zookeeper_host.")
def connect(self, loop: AbstractEventLoop = None):
- self._session = aiohttp.ClientSession(loop=loop)
+ self._solr = self._get_connection()
async def health_check(self):
try:
- async with aiohttp.ClientSession() as session:
- response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
- if response.status == 200:
- return True
- else:
- logger.error("Solr health check returned status {}.".format(response.status))
- except aiohttp.ClientConnectionError as e:
- raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
-
- return False
+ connection = self._get_connection()
+ connection.ping()
+ except pysolr.SolrError:
+ raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+ except NoNodeError:
+ raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+ except KazooTimeoutError:
+ raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
async def save_metadata(self, nexus_tile: NexusTile) -> None:
solr_doc = self._build_solr_doc(nexus_tile)
+ await self._save_document(solr_doc)
- await self._save_document(self.collection, solr_doc)
-
- @retry(stop=stop_after_attempt(5))
- async def _save_document(self, collection: str, doc: dict):
- url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
- response = await self._session.post(url, json=doc)
- if response.status < 200 or response.status >= 400:
- raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+ @run_in_executor
+ def _save_document(self, doc: dict):
+ try:
+ self._solr.add([doc])
+ except pysolr.SolrError:
+ raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 0479f99..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,4 +1,6 @@
cassandra-driver==3.23.0
aiomultiprocess==0.7.0
aioboto3
-tblib==1.6.0
\ No newline at end of file
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file