You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2021/03/15 19:41:04 UTC

[incubator-sdap-nexus] branch master updated: SDAP-305: Get current match up working with AVHRR OI data that is currently ingested in the bigdata cluster (#123)

This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 84480ab  SDAP-305: Get current match up working with AVHRR OI data that is currently ingested in the bigdata cluster (#123)
84480ab is described below

commit 84480ab35ae2f0f3e468487aee751d0a0c7902aa
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Mon Mar 15 12:40:54 2021 -0700

    SDAP-305: Get current match up working with AVHRR OI data that is currently ingested in the bigdata cluster (#123)
    
    * CDMS-47: Only use TokenAwarePolicy when also using DCAwareRoundRobinPolicy when setting up Cassandra connection pool
    
    * CDMS-47: Always use AuthProvider when connecting to Cassandra
    
    * CDMS-47: Fixed bugs found while testing matchup
    
    * CDMS-47: Update python version in analysis setup.py to 3.7
    
    * SDAP-305: Check for Solr 'tile_max/min_lat/lon' possibly being a list
    
    * SDAP-305: Clean up matchup
    
    * SDAP-305: Return instead of raising StopIteration exception in spark Matchup
    
    * SDAP-305: Return instead of raising StopIteration exception in spark Matchup
---
 analysis/setup.py                                      |  2 +-
 .../webservice/algorithms/doms/DomsInitialization.py   |  4 ++--
 analysis/webservice/algorithms/doms/ResultsStorage.py  | 10 ++++++++--
 analysis/webservice/algorithms_spark/Matchup.py        | 13 ++++++++-----
 data-access/nexustiles/dao/CassandraProxy.py           |  5 +++--
 data-access/nexustiles/nexustiles.py                   | 18 +++++++++++++++---
 6 files changed, 37 insertions(+), 15 deletions(-)

diff --git a/analysis/setup.py b/analysis/setup.py
index 62a6891..3eced72 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -57,7 +57,7 @@ setuptools.setup(
         ('static', ['static/index.html'])
     ],
     platforms='any',
-    python_requires='~=2.7',
+    python_requires='~=3.7',
     classifiers=[
         'Development Status :: 1 - Pre-Alpha',
         'Intended Audience :: Developers',
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index f53c74e..097b4a8 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -56,9 +56,9 @@ class DomsInitializer:
 
         if cassPolicy == 'DCAwareRoundRobinPolicy':
             dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+            token_policy = TokenAwarePolicy(dc_policy)
         elif cassPolicy == 'WhiteListRoundRobinPolicy':
-            dc_policy = WhiteListRoundRobinPolicy([cassHost])
-        token_policy = TokenAwarePolicy(dc_policy)
+            token_policy = WhiteListRoundRobinPolicy([cassHost])
 
         if cassUsername and cassPassword:
             auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 7ffe3ea..478fef4 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -24,6 +24,7 @@ import pkg_resources
 from cassandra.cluster import Cluster
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
 from cassandra.query import BatchStatement
+from cassandra.auth import PlainTextAuthProvider
 from pytz import UTC
 
 
@@ -36,18 +37,23 @@ class AbstractResultsContainer:
 
     def __enter__(self):
         domsconfig = configparser.RawConfigParser()
-        domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
+        domsconfig.read_string(
+            pkg_resources.resource_string(__name__, "domsconfig.ini").decode('utf-8'))
 
         cassHost = domsconfig.get("cassandra", "host")
         cassKeyspace = domsconfig.get("cassandra", "keyspace")
         cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
         cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+        cassUsername = domsconfig.get("cassandra", "username")
+        cassPassword = domsconfig.get("cassandra", "password")
+
+        auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
 
         dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
         token_policy = TokenAwarePolicy(dc_policy)
 
         self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
-                                protocol_version=cassVersion)
+                                protocol_version=cassVersion, auth_provider=auth_provider)
 
         self._session = self._cluster.connect(cassKeyspace)
         return self
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 914d570..6633aee 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -131,8 +131,8 @@ class Matchup(NexusCalcSparkHandler):
     }
     singleton = True
 
-    def __init__(self, algorithm_config=None, sc=None):
-        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc)
+    def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None):
+        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
         self.log = logging.getLogger(__name__)
 
     def parse_arguments(self, request):
@@ -227,6 +227,7 @@ class Matchup(NexusCalcSparkHandler):
                                                              sort=['tile_min_time_dt asc', 'tile_min_lon asc',
                                                                    'tile_min_lat asc'], rows=5000)]
 
+        self.log.debug('Found %s tile_ids', len(tile_ids))
         # Call spark_matchup
         self.log.debug("Calling Spark Driver")
         try:
@@ -617,7 +618,8 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
         print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
     except IndexError:
         # This should only happen if all measurements in a tile become masked after applying the bounding polygon
-        raise StopIteration
+        print('Tile is empty after masking spatially. Skipping this tile.')
+        return
 
     # Convert valid tile lat,lon tuples to UTM tuples
     the_time = datetime.now()
@@ -677,10 +679,11 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
               "platform": platform,
               "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
 
+    dataset_url = edge_endpoints.getEndpointByName(dataset)['url']
     if session is not None:
-        edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+        edge_request = session.get(dataset_url, params=params)
     else:
-        edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+        edge_request = requests.get(dataset_url, params=params)
 
     edge_request.raise_for_status()
     edge_response = json.loads(edge_request.text)
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index e24253e..86a8469 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -175,14 +175,15 @@ class CassandraProxy(object):
     def __open(self):
         if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
             dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
+            token_policy = TokenAwarePolicy(dc_policy)
         elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
-            dc_policy = WhiteListRoundRobinPolicy([self.__cass_url])
+            token_policy = WhiteListRoundRobinPolicy([self.__cass_url])
 
         if self.__cass_username and self.__cass_password:
             auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
         else:
             auth_provider = None
-        token_policy = TokenAwarePolicy(dc_policy)
+
         connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
                          protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
                          port=self.__cass_port,
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 2f2689b..64361e3 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -460,9 +460,21 @@ class NexusTileService(object):
                 pass
 
             try:
-                tile.bbox = BBox(
-                    solr_doc['tile_min_lat'], solr_doc['tile_max_lat'],
-                    solr_doc['tile_min_lon'], solr_doc['tile_max_lon'])
+                min_lat = solr_doc['tile_min_lat']
+                min_lon = solr_doc['tile_min_lon']
+                max_lat = solr_doc['tile_max_lat']
+                max_lon = solr_doc['tile_max_lon']
+
+                if isinstance(min_lat, list):
+                    min_lat = min_lat[0]
+                if isinstance(min_lon, list):
+                    min_lon = min_lon[0]
+                if isinstance(max_lat, list):
+                    max_lat = max_lat[0]
+                if isinstance(max_lon, list):
+                    max_lon = max_lon[0]
+
+                tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon)
             except KeyError:
                 pass