You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/03 20:59:39 UTC

[incubator-sdap-ingester] 09/10: solr history bug fixes

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 09d3da171c3bc84a21f3bc7a84998dfbfc52ba0d
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700

    solr history bug fixes
---
 .../history_manager/SolrIngestionHistory.py        | 19 +++++++---------
 granule_ingester/conda-requirements.txt            |  2 +-
 granule_ingester/docker/entrypoint.sh              |  4 +++-
 .../granule_ingester/consumer/Consumer.py          |  1 +
 granule_ingester/granule_ingester/main.py          | 25 ++++++++++++++++++----
 .../granule_ingester/pipeline/Pipeline.py          |  9 +++++++-
 .../granule_ingester/writers/CassandraStore.py     | 14 ++++++++++--
 .../granule_ingester/writers/SolrStore.py          |  2 +-
 8 files changed, 55 insertions(+), 21 deletions(-)

diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 1ae7156..4e6d3e5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -35,8 +35,8 @@ class SolrIngestionHistory(IngestionHistory):
         try:
             self._solr_url = solr_url
             self._create_collection_if_needed()
-            self._solr_granules = pysolr.Solr('/'.join([solr_url.strip('/'), self._granule_collection_name]))
-            self._solr_datasets = pysolr.Solr('/'.join([solr_url.strip('/'), self._dataset_collection_name]))
+            self._solr_granules = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._granule_collection_name}")
+            self._solr_datasets = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._dataset_collection_name}")
             self._dataset_id = dataset_id
             self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
             self._latest_ingested_file_update = self._get_latest_file_update()
@@ -63,7 +63,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_datasets.add([{
                 'id': self._dataset_id,
                 'dataset_s': self._dataset_id,
-                'latest_update_l': self._latest_ingested_file_update}])
+                'latest_update_l': int(self._latest_ingested_file_update)}])
             self._solr_datasets.commit()
 
     def _get_latest_file_update(self):
@@ -87,8 +87,7 @@ class SolrIngestionHistory(IngestionHistory):
                 self._req_session = requests.session()
 
             payload = {'action': 'CLUSTERSTATUS'}
-            result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
-                                           params=payload)
+            result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
             response = result.json()
             node_number = len(response['cluster']['live_nodes'])
 
@@ -100,12 +99,11 @@ class SolrIngestionHistory(IngestionHistory):
                            'name': self._granule_collection_name,
                            'numShards': node_number
                            }
-                result = self._req_session.get('/'.join([self._solr_url.strip("/"), 'admin', 'collections']),
-                                               params=payload)
+                result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
                 response = result.json()
                 logger.info(f"solr collection created {response}")
                 # Update schema
-                schema_url = '/'.join([self._solr_url.strip('/'), self._granule_collection_name, 'schema'])
+                schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
                 # granule_s # dataset_s so that all the granule of a dataset are less likely to be on the same shard
                 # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
                 self._add_field(schema_url, "dataset_s", "StrField")
@@ -121,13 +119,12 @@ class SolrIngestionHistory(IngestionHistory):
                            'name': self._dataset_collection_name,
                            'numShards': node_number
                            }
-                result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
-                                               params=payload)
+                result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
                 response = result.json()
                 logger.info(f"solr collection created {response}")
                 # Update schema
                 # http://localhost:8983/solr/nexusdatasets/schema?_=1588555874864&wt=json
-                schema_url = '/'.join([self._solr_url.strip('/'), self._dataset_collection_name, 'schema'])
+                schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
                 # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
                 self._add_field(schema_url, "dataset_s", "StrField")
                 self._add_field(schema_url, "latest_update_l", "TrieLongField")
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index 3a1cb9b..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
   $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \
   $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
                 raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
                 await queue_iter.close()
+                await channel.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index f602da8..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore, SolrStore
 
 
-def cassandra_factory(contact_points, port):
-    store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+    store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
     store.connect()
     return store
 
@@ -73,6 +73,14 @@ async def main(loop):
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--cassandra_username',
+                        metavar="USERNAME",
+                        default=None,
+                        help='Cassandra username. Optional.')
+    parser.add_argument('--cassandra_password',
+                        metavar="PASSWORD",
+                        default=None,
+                        help='Cassandra password. Optional.')
     parser.add_argument('--solr_host_and_port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
@@ -99,6 +107,8 @@ async def main(loop):
 
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
@@ -106,11 +116,18 @@ async def main(loop):
                         rabbitmq_username=args.rabbitmq_username,
                         rabbitmq_password=args.rabbitmq_password,
                         rabbitmq_queue=args.rabbitmq_queue,
-                        data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+                        data_store_factory=partial(cassandra_factory,
+                                                   cassandra_contact_points,
+                                                   cassandra_port,
+                                                   cassandra_username,
+                                                   cassandra_password),
                         metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
     try:
         solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
-        await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+        await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                cassandra_port,
+                                                cassandra_username,
+                                                cassandra_password),
                                  solr_store,
                                  consumer])
         async with consumer:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import os
 import pickle
 import time
 from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
         # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
         self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
         return processor_module
 
     async def run(self):
+
+        logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
 
@@ -170,7 +175,9 @@ class Pipeline:
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
-                                      shared_memory)) as pool:
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6b2cf32..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,6 +18,7 @@ import asyncio
 import logging
 import uuid
 
+from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
@@ -39,8 +40,10 @@ class TileModel(Model):
 
 
 class CassandraStore(DataStore):
-    def __init__(self, contact_points=None, port=9042):
+    def __init__(self, contact_points=None, port=9042, username=None, password=None):
         self._contact_points = contact_points
+        self._username = username
+        self._password = password
         self._port = port
         self._session = None
 
@@ -53,11 +56,18 @@ class CassandraStore(DataStore):
             raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
 
     def _get_session(self) -> Session:
+
+        if self._username and self._password:
+            auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+        else:
+            auth_provider = None
+
         cluster = Cluster(contact_points=self._contact_points,
                           port=self._port,
                           # load_balancing_policy=
                           reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
-                          default_retry_policy=RetryPolicy())
+                          default_retry_policy=RetryPolicy(),
+                          auth_provider=auth_provider)
         session = cluster.connect()
         session.set_keyspace('nexustiles')
         return session
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
 
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
-            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
             collections = {}
             for c in zk.zk.get_children("collections"):
                 collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))