You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 22:38:20 UTC
[incubator-sdap-nexus] branch cassandra-authentication updated:
cassandra
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch cassandra-authentication
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/cassandra-authentication by this push:
new 4eccc05 cassandra
4eccc05 is described below
commit 4eccc054a7fa9c124245171826a314532db7ec10
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 15:38:09 2020 -0700
cassandra
---
analysis/webservice/webapp.py | 35 +++++++++++++++-------
.../nexustiles/config/datastores.ini.default | 4 ++-
data-access/nexustiles/dao/CassandraProxy.py | 19 ++++++++++--
data-access/nexustiles/nexustiles.py | 6 ++--
4 files changed, 46 insertions(+), 18 deletions(-)
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index adfedda..3ee2f09 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -13,19 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import ConfigParser
import importlib
import logging
import sys
+from functools import partial
+
import pkg_resources
import tornado.web
-import webservice.algorithms_spark.NexusCalcSparkHandler
from tornado.options import define, options, parse_command_line
+import webservice.algorithms_spark.NexusCalcSparkHandler
+from nexustiles.nexustiles import NexusTileService
from webservice import NexusHandler
from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+
def inject_args_in_config(args, config):
"""
Takes command argparse arguments and push them in the config
@@ -37,9 +40,9 @@ def inject_args_in_config(args, config):
n = t_opt.name
first_ = n.find('_')
if first_ > 0:
- s, o = n[:first_], n[first_+1:]
+ s, o = n[:first_], n[first_ + 1:]
v = t_opt.value()
- log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v , s, o))
+ log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v, s, o))
if not config.has_section(s):
config.add_section(s)
config.set(s, o, v)
@@ -67,6 +70,10 @@ if __name__ == "__main__":
define('solr_time_out', default=60,
help='time out for solr requests in seconds, default (60) is ok for most deployments'
' when solr performances are not good this might need to be increased')
+ define('solr_host', help='solr host and port')
+ define('cassandra_host', help='cassandra host')
+ define('cassandra_username', help='cassandra username')
+ define('cassandra_password', help='cassandra password')
parse_command_line()
algorithm_config = inject_args_in_config(options, algorithm_config)
@@ -96,22 +103,28 @@ if __name__ == "__main__":
log.info("Initializing request ThreadPool to %s" % max_request_threads)
request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
+ tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
spark_context = None
for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
if spark_context is None:
from pyspark.sql import SparkSession
+
spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
spark_context = spark.sparkContext
- handlers.append(
- (clazzWrapper.path, NexusRequestHandler,
- dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context,
- thread_pool=request_thread_pool)))
+ handlers.append((clazzWrapper.path,
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+ tile_service_factory=tile_service_factory,
+ sc=spark_context,
+ thread_pool=request_thread_pool)))
else:
- handlers.append(
- (clazzWrapper.path, NexusRequestHandler,
- dict(clazz=clazzWrapper, thread_pool=request_thread_pool)))
+ handlers.append((clazzWrapper.path,
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+ tile_service_factory=tile_service_factory,
+ thread_pool=request_thread_pool)))
class VersionHandler(tornado.web.RequestHandler):
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default
index 0fe8d9d..63d08af 100644
--- a/data-access/nexustiles/config/datastores.ini.default
+++ b/data-access/nexustiles/config/datastores.ini.default
@@ -5,6 +5,8 @@ keyspace=nexustiles
local_datacenter=datacenter1
protocol_version=3
dc_policy=DCAwareRoundRobinPolicy
+username=
+password=
[s3]
bucket=nexus-jpl
@@ -15,7 +17,7 @@ table=nexus-jpl-table
region=us-west-2
[solr]
-host=sdap-solr:8983
+host=localhost:8983
core=nexustiles
[datastore]
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index ed37c5c..9a38e29 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -13,19 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import uuid
from ConfigParser import NoOptionError
-from multiprocessing.synchronize import Lock
import nexusproto.DataTile_pb2 as nexusproto
import numpy as np
+from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine import columns, connection, CQLEngineException
from cassandra.cqlengine.models import Model
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
+from multiprocessing.synchronize import Lock
from nexusproto.serialization import from_shaped_array
INIT_LOCK = Lock()
+logger = logging.getLogger(__name__)
class NexusTileData(Model):
__table_name__ = 'sea_surface_temp'
@@ -151,11 +154,16 @@ class CassandraProxy(object):
def __init__(self, config):
self.config = config
self.__cass_url = config.get("cassandra", "host")
+ self.__cass_username = config.get("cassandra", "username")
+ self.__cass_password = config.get("cassandra", "password")
self.__cass_keyspace = config.get("cassandra", "keyspace")
self.__cass_local_DC = config.get("cassandra", "local_datacenter")
self.__cass_protocol_version = config.getint("cassandra", "protocol_version")
self.__cass_dc_policy = config.get("cassandra", "dc_policy")
+ logger.info("Setting cassandra host to " + self.__cass_url)
+ logger.info("Setting cassandra username to " + self.__cass_username)
+
try:
self.__cass_port = config.getint("cassandra", "port")
except NoOptionError:
@@ -168,16 +176,21 @@ class CassandraProxy(object):
self.__open()
def __open(self):
-
+ logger.info("Connecting to cassandra at " + self.__cass_url)
if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
dc_policy = WhiteListRoundRobinPolicy([self.__cass_url])
+ if self.__cass_username and self.__cass_password:
+ auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
+ else:
+ auth_provider = None
token_policy = TokenAwarePolicy(dc_policy)
connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
- port=self.__cass_port)
+ port=self.__cass_port,
+ auth_provider=auth_provider)
def fetch_nexus_tiles(self, *tile_ids):
tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 24db1ae..3e7e2f8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -102,10 +102,10 @@ class NexusTileService(object):
def override_config(self, config):
for section in config.sections():
- if self._config.has_section(section): # only override preexisting section, ignores the other
+ if self._config.has_section(section): # only override preexisting section, ignores the other
for option in config.options(section):
- self._config.set(section, option, config.get(section, option))
-
+ if config.get(section, option) is not None:
+ self._config.set(section, option, config.get(section, option))
def get_dataseries_list(self, simple=False):
if simple: