You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2021/03/15 19:41:04 UTC
[incubator-sdap-nexus] branch master updated: SDAP-305: Get current
match up working with AVHRR OI data that is currently ingested in the
bigdata cluster (#123)
This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 84480ab SDAP-305: Get current match up working with AVHRR OI data that is currently ingested in the bigdata cluster (#123)
84480ab is described below
commit 84480ab35ae2f0f3e468487aee751d0a0c7902aa
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Mon Mar 15 12:40:54 2021 -0700
SDAP-305: Get current match up working with AVHRR OI data that is currently ingested in the bigdata cluster (#123)
* CDMS-47: Only use TokenAwarePolicy when also using DCAwareRoundRobinPolicy when setting up Cassandra connection pool
* CDMS-47: Always use AuthProvider when connecting to Cassandra
* CDMS-47: Fixed bugs found while testing matchup
* CDMS-47: Update python version in analysis setup.py to 3.7
* SDAP-305: Check for Solr 'tile_max/min_lat/lon' possibly being a list
* SDAP-305: Clean up matchup
* SDAP-305: Return instead of raising StopIteration exception in spark Matchup
* SDAP-305: Return instead of raising StopIteration exception in spark Matchup
---
analysis/setup.py | 2 +-
.../webservice/algorithms/doms/DomsInitialization.py | 4 ++--
analysis/webservice/algorithms/doms/ResultsStorage.py | 10 ++++++++--
analysis/webservice/algorithms_spark/Matchup.py | 13 ++++++++-----
data-access/nexustiles/dao/CassandraProxy.py | 5 +++--
data-access/nexustiles/nexustiles.py | 18 +++++++++++++++---
6 files changed, 37 insertions(+), 15 deletions(-)
diff --git a/analysis/setup.py b/analysis/setup.py
index 62a6891..3eced72 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -57,7 +57,7 @@ setuptools.setup(
('static', ['static/index.html'])
],
platforms='any',
- python_requires='~=2.7',
+ python_requires='~=3.7',
classifiers=[
'Development Status :: 1 - Pre-Alpha',
'Intended Audience :: Developers',
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index f53c74e..097b4a8 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -56,9 +56,9 @@ class DomsInitializer:
if cassPolicy == 'DCAwareRoundRobinPolicy':
dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ token_policy = TokenAwarePolicy(dc_policy)
elif cassPolicy == 'WhiteListRoundRobinPolicy':
- dc_policy = WhiteListRoundRobinPolicy([cassHost])
- token_policy = TokenAwarePolicy(dc_policy)
+ token_policy = WhiteListRoundRobinPolicy([cassHost])
if cassUsername and cassPassword:
auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 7ffe3ea..478fef4 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -24,6 +24,7 @@ import pkg_resources
from cassandra.cluster import Cluster
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
from cassandra.query import BatchStatement
+from cassandra.auth import PlainTextAuthProvider
from pytz import UTC
@@ -36,18 +37,23 @@ class AbstractResultsContainer:
def __enter__(self):
domsconfig = configparser.RawConfigParser()
- domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
+ domsconfig.read_string(
+ pkg_resources.resource_string(__name__, "domsconfig.ini").decode('utf-8'))
cassHost = domsconfig.get("cassandra", "host")
cassKeyspace = domsconfig.get("cassandra", "keyspace")
cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+ cassUsername = domsconfig.get("cassandra", "username")
+ cassPassword = domsconfig.get("cassandra", "password")
+
+ auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
token_policy = TokenAwarePolicy(dc_policy)
self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
- protocol_version=cassVersion)
+ protocol_version=cassVersion, auth_provider=auth_provider)
self._session = self._cluster.connect(cassKeyspace)
return self
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 914d570..6633aee 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -131,8 +131,8 @@ class Matchup(NexusCalcSparkHandler):
}
singleton = True
- def __init__(self, algorithm_config=None, sc=None):
- NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc)
+ def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None):
+ NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
self.log = logging.getLogger(__name__)
def parse_arguments(self, request):
@@ -227,6 +227,7 @@ class Matchup(NexusCalcSparkHandler):
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
+ self.log.debug('Found %s tile_ids', len(tile_ids))
# Call spark_matchup
self.log.debug("Calling Spark Driver")
try:
@@ -617,7 +618,8 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
except IndexError:
# This should only happen if all measurements in a tile become masked after applying the bounding polygon
- raise StopIteration
+ print('Tile is empty after masking spatially. Skipping this tile.')
+ return
# Convert valid tile lat,lon tuples to UTM tuples
the_time = datetime.now()
@@ -677,10 +679,11 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
"platform": platform,
"itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+ dataset_url = edge_endpoints.getEndpointByName(dataset)['url']
if session is not None:
- edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+ edge_request = session.get(dataset_url, params=params)
else:
- edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+ edge_request = requests.get(dataset_url, params=params)
edge_request.raise_for_status()
edge_response = json.loads(edge_request.text)
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index e24253e..86a8469 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -175,14 +175,15 @@ class CassandraProxy(object):
def __open(self):
if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
+ token_policy = TokenAwarePolicy(dc_policy)
elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
- dc_policy = WhiteListRoundRobinPolicy([self.__cass_url])
+ token_policy = WhiteListRoundRobinPolicy([self.__cass_url])
if self.__cass_username and self.__cass_password:
auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
else:
auth_provider = None
- token_policy = TokenAwarePolicy(dc_policy)
+
connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
port=self.__cass_port,
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 2f2689b..64361e3 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -460,9 +460,21 @@ class NexusTileService(object):
pass
try:
- tile.bbox = BBox(
- solr_doc['tile_min_lat'], solr_doc['tile_max_lat'],
- solr_doc['tile_min_lon'], solr_doc['tile_max_lon'])
+ min_lat = solr_doc['tile_min_lat']
+ min_lon = solr_doc['tile_min_lon']
+ max_lat = solr_doc['tile_max_lat']
+ max_lon = solr_doc['tile_max_lon']
+
+ if isinstance(min_lat, list):
+ min_lat = min_lat[0]
+ if isinstance(min_lon, list):
+ min_lon = min_lon[0]
+ if isinstance(max_lat, list):
+ max_lat = max_lat[0]
+ if isinstance(max_lon, list):
+ max_lon = max_lon[0]
+
+ tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon)
except KeyError:
pass