You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 22:49:24 UTC

[incubator-sdap-ingester] 09/11: solr history bug fixes

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 67703003e505095edc6eab8843e899d20df62347
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700

    solr history bug fixes
---
 granule_ingester/conda-requirements.txt                | 2 +-
 granule_ingester/granule_ingester/consumer/Consumer.py | 1 +
 granule_ingester/granule_ingester/main.py              | 2 ++
 granule_ingester/granule_ingester/pipeline/Pipeline.py | 9 ++++++++-
 granule_ingester/granule_ingester/writers/SolrStore.py | 2 +-
 5 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
                 raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
                 await queue_iter.close()
+                await channel.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 87a6d5a..e50a395 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -109,6 +109,8 @@ async def main(loop):
     cassandra_password = args.cassandra_password
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import os
 import pickle
 import time
 from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
         # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
         self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
         return processor_module
 
     async def run(self):
+
+        logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
 
@@ -170,7 +175,9 @@ class Pipeline:
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
-                                      shared_memory)) as pool:
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
 
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
-            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
             collections = {}
             for c in zk.zk.get_children("collections"):
                 collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))