You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 22:49:24 UTC
[incubator-sdap-ingester] 09/11: solr history bug fixes
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 67703003e505095edc6eab8843e899d20df62347
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700
solr history bug fixes
---
granule_ingester/conda-requirements.txt | 2 +-
granule_ingester/granule_ingester/consumer/Consumer.py | 1 +
granule_ingester/granule_ingester/main.py | 2 ++
granule_ingester/granule_ingester/pipeline/Pipeline.py | 9 ++++++++-
granule_ingester/granule_ingester/writers/SolrStore.py | 2 +-
5 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
pyyaml==5.3.1
requests==2.23.0
aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
await queue_iter.close()
+ await channel.close()
raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 87a6d5a..e50a395 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -109,6 +109,8 @@ async def main(loop):
cassandra_password = args.cassandra_password
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
+ cassandra_username = args.cassandra_username
+ cassandra_password = args.cassandra_password
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+import os
import pickle
import time
from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
return processor_module
async def run(self):
+
+ logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
@@ -170,7 +175,9 @@ class Pipeline:
dataset,
self._data_store_factory,
self._metadata_store_factory,
- shared_memory)) as pool:
+ shared_memory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
def _get_connection(self) -> pysolr.Solr:
if self._zk_url:
- zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+ zk = pysolr.ZooKeeper(f"{self._zk_url}")
collections = {}
for c in zk.zk.get_children("collections"):
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))