You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/03 20:59:39 UTC
[incubator-sdap-ingester] 09/10: solr history bug fixes
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 09d3da171c3bc84a21f3bc7a84998dfbfc52ba0d
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700
solr history bug fixes
---
.../history_manager/SolrIngestionHistory.py | 19 +++++++---------
granule_ingester/conda-requirements.txt | 2 +-
granule_ingester/docker/entrypoint.sh | 4 +++-
.../granule_ingester/consumer/Consumer.py | 1 +
granule_ingester/granule_ingester/main.py | 25 ++++++++++++++++++----
.../granule_ingester/pipeline/Pipeline.py | 9 +++++++-
.../granule_ingester/writers/CassandraStore.py | 14 ++++++++++--
.../granule_ingester/writers/SolrStore.py | 2 +-
8 files changed, 55 insertions(+), 21 deletions(-)
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 1ae7156..4e6d3e5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -35,8 +35,8 @@ class SolrIngestionHistory(IngestionHistory):
try:
self._solr_url = solr_url
self._create_collection_if_needed()
- self._solr_granules = pysolr.Solr('/'.join([solr_url.strip('/'), self._granule_collection_name]))
- self._solr_datasets = pysolr.Solr('/'.join([solr_url.strip('/'), self._dataset_collection_name]))
+ self._solr_granules = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._granule_collection_name}")
+ self._solr_datasets = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._dataset_collection_name}")
self._dataset_id = dataset_id
self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
self._latest_ingested_file_update = self._get_latest_file_update()
@@ -63,7 +63,7 @@ class SolrIngestionHistory(IngestionHistory):
self._solr_datasets.add([{
'id': self._dataset_id,
'dataset_s': self._dataset_id,
- 'latest_update_l': self._latest_ingested_file_update}])
+ 'latest_update_l': int(self._latest_ingested_file_update)}])
self._solr_datasets.commit()
def _get_latest_file_update(self):
@@ -87,8 +87,7 @@ class SolrIngestionHistory(IngestionHistory):
self._req_session = requests.session()
payload = {'action': 'CLUSTERSTATUS'}
- result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
node_number = len(response['cluster']['live_nodes'])
@@ -100,12 +99,11 @@ class SolrIngestionHistory(IngestionHistory):
'name': self._granule_collection_name,
'numShards': node_number
}
- result = self._req_session.get('/'.join([self._solr_url.strip("/"), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
- schema_url = '/'.join([self._solr_url.strip('/'), self._granule_collection_name, 'schema'])
+ schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
# granule_s # dataset_s so that all the granule of a dataset are less likely to be on the same shard
# self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
self._add_field(schema_url, "dataset_s", "StrField")
@@ -121,13 +119,12 @@ class SolrIngestionHistory(IngestionHistory):
'name': self._dataset_collection_name,
'numShards': node_number
}
- result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
- params=payload)
+ result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
# http://localhost:8983/solr/nexusdatasets/schema?_=1588555874864&wt=json
- schema_url = '/'.join([self._solr_url.strip('/'), self._dataset_collection_name, 'schema'])
+ schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
# self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
self._add_field(schema_url, "dataset_s", "StrField")
self._add_field(schema_url, "latest_update_l", "TrieLongField")
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
pyyaml==5.3.1
requests==2.23.0
aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
tenacity
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index 3a1cb9b..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
$([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
- $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+ $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
+ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \
$([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
await queue_iter.close()
+ await channel.close()
raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index f602da8..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore, SolrStore
-def cassandra_factory(contact_points, port):
- store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+ store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
store.connect()
return store
@@ -73,6 +73,14 @@ async def main(loop):
default=9042,
metavar="PORT",
help='Cassandra port. (Default: 9042)')
+ parser.add_argument('--cassandra_username',
+ metavar="USERNAME",
+ default=None,
+ help='Cassandra username. Optional.')
+ parser.add_argument('--cassandra_password',
+ metavar="PASSWORD",
+ default=None,
+ help='Cassandra password. Optional.')
parser.add_argument('--solr_host_and_port',
default='http://localhost:8983',
metavar='HOST:PORT',
@@ -99,6 +107,8 @@ async def main(loop):
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
+ cassandra_username = args.cassandra_username
+ cassandra_password = args.cassandra_password
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
@@ -106,11 +116,18 @@ async def main(loop):
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
- data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+ data_store_factory=partial(cassandra_factory,
+ cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
try:
solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
- await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+ await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_username,
+ cassandra_password),
solr_store,
consumer])
async with consumer:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+import os
import pickle
import time
from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
return processor_module
async def run(self):
+
+ logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
@@ -170,7 +175,9 @@ class Pipeline:
dataset,
self._data_store_factory,
self._metadata_store_factory,
- shared_memory)) as pool:
+ shared_memory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6b2cf32..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,6 +18,7 @@ import asyncio
import logging
import uuid
+from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
@@ -39,8 +40,10 @@ class TileModel(Model):
class CassandraStore(DataStore):
- def __init__(self, contact_points=None, port=9042):
+ def __init__(self, contact_points=None, port=9042, username=None, password=None):
self._contact_points = contact_points
+ self._username = username
+ self._password = password
self._port = port
self._session = None
@@ -53,11 +56,18 @@ class CassandraStore(DataStore):
raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
+
+ if self._username and self._password:
+ auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+ else:
+ auth_provider = None
+
cluster = Cluster(contact_points=self._contact_points,
port=self._port,
# load_balancing_policy=
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
- default_retry_policy=RetryPolicy())
+ default_retry_policy=RetryPolicy(),
+ auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace('nexustiles')
return session
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
def _get_connection(self) -> pysolr.Solr:
if self._zk_url:
- zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+ zk = pysolr.ZooKeeper(f"{self._zk_url}")
collections = {}
for c in zk.zk.get_children("collections"):
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))