You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/06 18:29:07 UTC

(incubator-sdap-ingester) 01/03: Explicitly close Solr & ZK connections

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch SDAP-512
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 835123f88af734266e765b2b2dda1266f8a97a73
Author: rileykk <ri...@jpl.nasa.gov>
AuthorDate: Wed Mar 6 09:41:52 2024 -0800

    Explicitly close Solr & ZK connections
---
 .../granule_ingester/pipeline/Pipeline.py          |  6 +++++-
 .../granule_ingester/writers/MetadataStore.py      | 10 +++++++++
 .../granule_ingester/writers/SolrStore.py          | 25 ++++++++++++++++------
 3 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 9ebb529..541700f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -257,7 +257,11 @@ class Pipeline:
                 logger.info(f"Now writing generated tiles...")
 
                 await self._data_store_factory().save_batch(results)
-                await self._metadata_store_factory().save_batch(results)
+
+                metadata_store = self._metadata_store_factory()
+                await metadata_store.save_batch(results)
+
+                metadata_store.close()
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 8a97566..7296075 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -18,6 +18,8 @@ from abc import ABC, abstractmethod
 from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.healthcheck import HealthCheck
+from asyncio import AbstractEventLoop
+
 
 from typing import List
 
@@ -31,3 +33,11 @@ class MetadataStore(HealthCheck, ABC):
     def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
         pass
 
+    @abstractmethod
+    def connect(self, loop: AbstractEventLoop = None) -> None:
+        pass
+
+    @abstractmethod
+    def close(self) -> None:
+        pass
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 2dac038..963e4bc 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,7 +21,7 @@ import logging
 from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
-from typing import Dict, List, Union
+from typing import Dict, List, Union, Tuple, Optional
 
 import pysolr
 from kazoo.exceptions import NoNodeError
@@ -51,7 +51,8 @@ class SolrStore(MetadataStore):
         self.geo_precision: int = 3
         self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
-        self._solr = None
+        self._solr: Optional[pysolr.Solr] = None
+        self._zk: Optional[pysolr.ZooKeeper] = None
 
     def _get_collections(self, zk, parent_nodes):
         """
@@ -85,23 +86,33 @@ class SolrStore(MetadataStore):
             collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
         zk.collections = collections
 
-    def _get_connection(self) -> pysolr.Solr:
+    def _get_connection(self) -> Tuple[pysolr.Solr, Union[pysolr.ZooKeeper, None]]:
         if self._zk_url:
             zk = pysolr.ZooKeeper(f"{self._zk_url}")
             self._set_solr_status(zk)
-            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True), zk
         elif self._solr_url:
-            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True), None
         else:
             raise RuntimeError("You must provide either solr_host or zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._solr = self._get_connection()
+        self._solr, self._zk = self._get_connection()
+
+    def close(self):
+        if self._solr is not None:
+            self._solr.get_session().close()
+
+        if self._zk is not None:
+            self._zk.zk.stop()
+            self._zk.zk.close()
 
     async def health_check(self):
         try:
-            connection = self._get_connection()
+            connection, _ = self._get_connection()
             connection.ping()
+
+            self.close()
         except pysolr.SolrError:
             raise SolrFailedHealthCheckError("Cannot connect to Solr!")
         except NoNodeError: