You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 22:38:20 UTC

[incubator-sdap-nexus] branch cassandra-authentication updated: cassandra

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch cassandra-authentication
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/cassandra-authentication by this push:
     new 4eccc05  cassandra
4eccc05 is described below

commit 4eccc054a7fa9c124245171826a314532db7ec10
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 15:38:09 2020 -0700

    cassandra
---
 analysis/webservice/webapp.py                      | 35 +++++++++++++++-------
 .../nexustiles/config/datastores.ini.default       |  4 ++-
 data-access/nexustiles/dao/CassandraProxy.py       | 19 ++++++++++--
 data-access/nexustiles/nexustiles.py               |  6 ++--
 4 files changed, 46 insertions(+), 18 deletions(-)

diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index adfedda..3ee2f09 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -13,19 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
 import ConfigParser
 import importlib
 import logging
 import sys
+from functools import partial
+
 import pkg_resources
 import tornado.web
-import webservice.algorithms_spark.NexusCalcSparkHandler
 from tornado.options import define, options, parse_command_line
 
+import webservice.algorithms_spark.NexusCalcSparkHandler
+from nexustiles.nexustiles import NexusTileService
 from webservice import NexusHandler
 from webservice.nexus_tornado.request.handlers import NexusRequestHandler
 
+
 def inject_args_in_config(args, config):
     """
         Takes command argparse arguments and push them in the config
@@ -37,9 +40,9 @@ def inject_args_in_config(args, config):
         n = t_opt.name
         first_ = n.find('_')
         if first_ > 0:
-            s, o = n[:first_], n[first_+1:]
+            s, o = n[:first_], n[first_ + 1:]
             v = t_opt.value()
-            log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v , s, o))
+            log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v, s, o))
             if not config.has_section(s):
                 config.add_section(s)
             config.set(s, o, v)
@@ -67,6 +70,10 @@ if __name__ == "__main__":
     define('solr_time_out', default=60,
            help='time out for solr requests in seconds, default (60) is ok for most deployments'
                 ' when solr performances are not good this might need to be increased')
+    define('solr_host', help='solr host and port')
+    define('cassandra_host', help='cassandra host')
+    define('cassandra_username', help='cassandra username')
+    define('cassandra_password', help='cassandra password')
 
     parse_command_line()
     algorithm_config = inject_args_in_config(options, algorithm_config)
@@ -96,22 +103,28 @@ if __name__ == "__main__":
     log.info("Initializing request ThreadPool to %s" % max_request_threads)
     request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
 
+    tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
     spark_context = None
     for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
         if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
             if spark_context is None:
                 from pyspark.sql import SparkSession
+
                 spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
                 spark_context = spark.sparkContext
 
-            handlers.append(
-                (clazzWrapper.path, NexusRequestHandler,
-                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context,
-                      thread_pool=request_thread_pool)))
+            handlers.append((clazzWrapper.path,
+                             NexusRequestHandler,
+                             dict(clazz=clazzWrapper,
+                                  tile_service_factory=tile_service_factory,
+                                  sc=spark_context,
+                                  thread_pool=request_thread_pool)))
         else:
-            handlers.append(
-                (clazzWrapper.path, NexusRequestHandler,
-                 dict(clazz=clazzWrapper, thread_pool=request_thread_pool)))
+            handlers.append((clazzWrapper.path,
+                             NexusRequestHandler,
+                             dict(clazz=clazzWrapper,
+                                  tile_service_factory=tile_service_factory,
+                                  thread_pool=request_thread_pool)))
 
 
     class VersionHandler(tornado.web.RequestHandler):
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default
index 0fe8d9d..63d08af 100644
--- a/data-access/nexustiles/config/datastores.ini.default
+++ b/data-access/nexustiles/config/datastores.ini.default
@@ -5,6 +5,8 @@ keyspace=nexustiles
 local_datacenter=datacenter1
 protocol_version=3
 dc_policy=DCAwareRoundRobinPolicy
+username=
+password=
 
 [s3]
 bucket=nexus-jpl
@@ -15,7 +17,7 @@ table=nexus-jpl-table
 region=us-west-2
 
 [solr]
-host=sdap-solr:8983
+host=localhost:8983
 core=nexustiles
 
 [datastore]
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index ed37c5c..9a38e29 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -13,19 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import uuid
 from ConfigParser import NoOptionError
-from multiprocessing.synchronize import Lock
 
 import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
+from cassandra.auth import PlainTextAuthProvider
 from cassandra.cqlengine import columns, connection, CQLEngineException
 from cassandra.cqlengine.models import Model
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
+from multiprocessing.synchronize import Lock
 from nexusproto.serialization import from_shaped_array
 
 INIT_LOCK = Lock()
 
+logger = logging.getLogger(__name__)
 
 class NexusTileData(Model):
     __table_name__ = 'sea_surface_temp'
@@ -151,11 +154,16 @@ class CassandraProxy(object):
     def __init__(self, config):
         self.config = config
         self.__cass_url = config.get("cassandra", "host")
+        self.__cass_username = config.get("cassandra", "username")
+        self.__cass_password = config.get("cassandra", "password")
         self.__cass_keyspace = config.get("cassandra", "keyspace")
         self.__cass_local_DC = config.get("cassandra", "local_datacenter")
         self.__cass_protocol_version = config.getint("cassandra", "protocol_version")
         self.__cass_dc_policy = config.get("cassandra", "dc_policy")
 
+        logger.info("Setting cassandra host to " + self.__cass_url)
+        logger.info("Setting cassandra username to " + self.__cass_username)
+
         try:
             self.__cass_port = config.getint("cassandra", "port")
         except NoOptionError:
@@ -168,16 +176,21 @@ class CassandraProxy(object):
                 self.__open()
 
     def __open(self):
-
+        logger.info("Connecting to cassandra at " + self.__cass_url)
         if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
             dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
         elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
             dc_policy = WhiteListRoundRobinPolicy([self.__cass_url])
 
+        if self.__cass_username and self.__cass_password:
+            auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
+        else:
+            auth_provider = None
         token_policy = TokenAwarePolicy(dc_policy)
         connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
                          protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
-                         port=self.__cass_port)
+                         port=self.__cass_port,
+                         auth_provider=auth_provider)
 
     def fetch_nexus_tiles(self, *tile_ids):
         tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 24db1ae..3e7e2f8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -102,10 +102,10 @@ class NexusTileService(object):
 
     def override_config(self, config):
         for section in config.sections():
-            if self._config.has_section(section): # only override preexisting section, ignores the other
+            if self._config.has_section(section):  # only override preexisting section, ignores the other
                 for option in config.options(section):
-                    self._config.set(section, option, config.get(section, option))
-
+                    if config.get(section, option) is not None:
+                        self._config.set(section, option, config.get(section, option))
 
     def get_dataseries_list(self, simple=False):
         if simple: