You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/06 18:29:07 UTC
(incubator-sdap-ingester) 01/03: Explicitly close Solr & ZK connections
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch SDAP-512
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 835123f88af734266e765b2b2dda1266f8a97a73
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Wed Mar 6 09:41:52 2024 -0800
Explicitly close Solr & ZK connections
---
.../granule_ingester/pipeline/Pipeline.py | 6 +++++-
.../granule_ingester/writers/MetadataStore.py | 10 +++++++++
.../granule_ingester/writers/SolrStore.py | 25 ++++++++++++++++------
3 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 9ebb529..541700f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -257,7 +257,11 @@ class Pipeline:
logger.info(f"Now writing generated tiles...")
await self._data_store_factory().save_batch(results)
- await self._metadata_store_factory().save_batch(results)
+
+ metadata_store = self._metadata_store_factory()
+ await metadata_store.save_batch(results)
+
+ metadata_store.close()
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 8a97566..7296075 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -18,6 +18,8 @@ from abc import ABC, abstractmethod
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.healthcheck import HealthCheck
+from asyncio import AbstractEventLoop
+
from typing import List
@@ -31,3 +33,11 @@ class MetadataStore(HealthCheck, ABC):
def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
pass
+ @abstractmethod
+ def connect(self, loop: AbstractEventLoop = None) -> None:
+ pass
+
+ @abstractmethod
+ def close(self) -> None:
+ pass
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 2dac038..963e4bc 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,7 +21,7 @@ import logging
from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
-from typing import Dict, List, Union
+from typing import Dict, List, Union, Tuple, Optional
import pysolr
from kazoo.exceptions import NoNodeError
@@ -51,7 +51,8 @@ class SolrStore(MetadataStore):
self.geo_precision: int = 3
self._collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
- self._solr = None
+ self._solr: Optional[pysolr.Solr] = None
+ self._zk: Optional[pysolr.ZooKeeper] = None
def _get_collections(self, zk, parent_nodes):
"""
@@ -85,23 +86,33 @@ class SolrStore(MetadataStore):
collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
zk.collections = collections
- def _get_connection(self) -> pysolr.Solr:
+ def _get_connection(self) -> Tuple[pysolr.Solr, Union[pysolr.ZooKeeper, None]]:
if self._zk_url:
zk = pysolr.ZooKeeper(f"{self._zk_url}")
self._set_solr_status(zk)
- return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+ return pysolr.SolrCloud(zk, self._collection, always_commit=True), zk
elif self._solr_url:
- return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+ return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True), None
else:
raise RuntimeError("You must provide either solr_host or zookeeper_host.")
def connect(self, loop: AbstractEventLoop = None):
- self._solr = self._get_connection()
+ self._solr, self._zk = self._get_connection()
+
+ def close(self):
+ if self._solr is not None:
+ self._solr.get_session().close()
+
+ if self._zk is not None:
+ self._zk.zk.stop()
+ self._zk.zk.close()
async def health_check(self):
try:
- connection = self._get_connection()
+ connection, _ = self._get_connection()
connection.ping()
+
+ self.close()
except pysolr.SolrError:
raise SolrFailedHealthCheckError("Cannot connect to Solr!")
except NoNodeError: