You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/08/01 22:03:42 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-185] add
consul service and config registration in python wrapper
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 8478a5f [STREAMPIPES-185] add consul service and config registration in python wrapper
8478a5f is described below
commit 8478a5fd823f0186c826760426e4204b2b17027b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sun Aug 2 00:03:25 2020 +0200
[STREAMPIPES-185] add consul service and config registration in python wrapper
---
streampipes-wrapper-python/requirements.txt | 2 +
streampipes-wrapper-python/setup.py | 6 +-
streampipes-wrapper-python/streampipes/api.py | 25 +++--
.../streampipes/{configuration.py => banner.py} | 9 +-
streampipes-wrapper-python/streampipes/core.py | 41 ++++----
streampipes-wrapper-python/streampipes/manager.py | 4 +-
.../streampipes/model/__init__.py | 0
.../{helper.py => model/config_item.py} | 25 +++--
.../streampipes/model/processor_config.py | 50 ++++++++++
.../streampipes/utils/__init__.py | 0
.../streampipes/utils/register.py | 109 +++++++++++++++++++++
11 files changed, 223 insertions(+), 48 deletions(-)
diff --git a/streampipes-wrapper-python/requirements.txt b/streampipes-wrapper-python/requirements.txt
index 1008c86..27e0195 100644
--- a/streampipes-wrapper-python/requirements.txt
+++ b/streampipes-wrapper-python/requirements.txt
@@ -6,3 +6,5 @@ Jinja2==2.11.2
MarkupSafe==1.1.1
waitress==1.4.4
Werkzeug==1.0.1
+
+setuptools~=49.2.0
\ No newline at end of file
diff --git a/streampipes-wrapper-python/setup.py b/streampipes-wrapper-python/setup.py
index dcef38c..63b792f 100644
--- a/streampipes-wrapper-python/setup.py
+++ b/streampipes-wrapper-python/setup.py
@@ -16,7 +16,7 @@
#
""""""
-from setuptools import setup
+from setuptools import setup, find_packages
import io
import os
@@ -28,7 +28,8 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') a
setup(
name='apache-streampipes-python',
version='0.67.0-SNAPSHOT',
- packages=["streampipes"],
+ #packages=["streampipes"],
+ packages=find_packages(),
url='https://github.com/apache/incubator-streampipes',
license='https://www.apache.org/licenses/LICENSE-2.0',
license_files=["LICENSE", "NOTICE"],
@@ -41,6 +42,7 @@ setup(
'confluent-kafka==1.4.2',
'Flask==1.1.2',
'waitress==1.4.4',
+ 'python-consul==1.1.0'
],
tests_require=[],
python_requires='>=3.5',
diff --git a/streampipes-wrapper-python/streampipes/api.py b/streampipes-wrapper-python/streampipes/api.py
index f6baf08..36f3735 100644
--- a/streampipes-wrapper-python/streampipes/api.py
+++ b/streampipes-wrapper-python/streampipes/api.py
@@ -15,10 +15,11 @@
# limitations under the License.
#
""" API endpoints """
-from flask import request, jsonify, Flask
+import json
+
+from flask import request, jsonify, Flask, Response, make_response
from waitress import serve
-from streampipes.configuration import flask_host, port
from streampipes.manager import ProcessorDispatcher
@@ -28,23 +29,29 @@ class EndpointAction(object):
def __call__(self, *args, **kwargs):
""" call corresponding handler method """
- response = self.action(**request.get_json())
- return jsonify(response)
+ if request.get_json() is not None:
+ resp = self.action(**request.get_json())
+ else:
+ resp = self.action()
+
+ return make_response(jsonify(resp), 200)
class API(object):
""" EventProcessorAPI contains relevant RESTful endpoints to start and stop """
app = None
- def __init__(self, name='python-processor'):
- self.app = Flask(name)
+ def __init__(self, port: int):
+ self.app = Flask('python-processor')
+ self.port = port
self.add_endpoints()
def run(self):
- serve(self.app, host=flask_host, port=port)
+ serve(self.app, host='0.0.0.0', port=self.port)
def add_endpoints(self):
""" define and add event processor API endpoints """
+ self.add_endpoint(endpoint='/', endpoint_name='/', methods=['GET'], handler=self.welcome)
self.add_endpoint(endpoint='/invoke', endpoint_name='/invoke', methods=['POST'], handler=self.invoke)
self.add_endpoint(endpoint='/detach', endpoint_name='/detach', methods=['POST'], handler=self.detach)
@@ -52,6 +59,10 @@ class API(object):
self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler), methods=methods)
@staticmethod
+ def welcome():
+ return {'welcome': 'hello-world!'}
+
+ @staticmethod
def invoke(**kwargs):
""" Receives invocation graph from pipeline management in the backend
diff --git a/streampipes-wrapper-python/streampipes/configuration.py b/streampipes-wrapper-python/streampipes/banner.py
similarity index 87%
rename from streampipes-wrapper-python/streampipes/configuration.py
rename to streampipes-wrapper-python/streampipes/banner.py
index 719b6e1..5bb8e75 100644
--- a/streampipes-wrapper-python/streampipes/configuration.py
+++ b/streampipes-wrapper-python/streampipes/banner.py
@@ -14,14 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-"""config file"""
-import os
+"""banner"""
-flask_host = os.getenv('FLASK_HOST', '0.0.0.0')
-port = os.getenv('SP_PORT', 5000)
-kafka_consumer_thread = 'kafka_consumer_thread'
-
-banner="""\
+banner = """\
_______ __ ______ __
| __| |_.----.-----.---.-.--------.| __ \__|.-----.-----.-----.
|__ | _| _| -__| _ | || __/ || _ | -__|__ --|
diff --git a/streampipes-wrapper-python/streampipes/core.py b/streampipes-wrapper-python/streampipes/core.py
index c31cd9e..08bb665 100644
--- a/streampipes-wrapper-python/streampipes/core.py
+++ b/streampipes-wrapper-python/streampipes/core.py
@@ -17,25 +17,24 @@
"""contains relevant base classes"""
import json
import logging
+import threading
from abc import ABC, abstractmethod
from confluent_kafka.admin import AdminClient
-
-from streampipes.api import API
-from streampipes.configuration import banner, kafka_consumer_thread
-from streampipes.helper import threaded
from confluent_kafka import Producer, Consumer
+from streampipes.api import API
+from streampipes.banner import banner
+from streampipes.model.processor_config import Config
+from streampipes.utils.register import ConsulUtils
-class StandaloneSubmitter(ABC):
- @classmethod
- def init(cls):
- cls._load_banner()
- cls.api = API()
- cls.api.run()
-
+class StandaloneModelSubmitter(ABC):
@classmethod
- def _load_banner(cls):
+ def init(cls, config: Config):
print(banner)
+ api = API(port=config.port)
+ threading.Thread(target=api.run()).start()
+
+ ConsulUtils.register_service(app_id=config.app_id, host=config.host, port=int(config.port))
class EventProcessor(ABC):
@@ -82,7 +81,9 @@ class EventProcessor(ABC):
def init(self):
self.logger.info('start processor {}'.format(self.invocation_id))
- self._threads[kafka_consumer_thread] = self._consume(self._input_topics)
+ thread = threading.Thread(target=self.__consume, name=self.invocation_id)
+ thread.start()
+ self._threads['kafka'] = thread
def active_threads(self):
return self._threads
@@ -108,16 +109,16 @@ class EventProcessor(ABC):
""" on_detach is called when processor is stopped """
pass
- def _on_event(self, event):
+ def __on_event(self, event):
result = self.on_event(event)
if result is not None:
- self._produce(result)
+ self.__produce(result)
- @threaded
- def _consume(self, topics):
+ #@threaded
+ def __consume(self):
""" retrieve events from kafka """
- self._consumer.subscribe(topics=[topics])
+ self._consumer.subscribe(topics=[self._input_topics])
self._running = True
while self._running:
@@ -141,9 +142,9 @@ class EventProcessor(ABC):
self.logger.info("Not a valid json {}".format(e))
continue
- self._on_event(event)
+ self.__on_event(event)
- def _produce(self, result):
+ def __produce(self, result):
""" send events to kafka """
event = json.dumps(result).encode('utf-8')
try:
diff --git a/streampipes-wrapper-python/streampipes/manager.py b/streampipes-wrapper-python/streampipes/manager.py
index 64a2a4e..e151e8d 100644
--- a/streampipes-wrapper-python/streampipes/manager.py
+++ b/streampipes-wrapper-python/streampipes/manager.py
@@ -18,8 +18,6 @@
import logging
from abc import ABC
-from streampipes.configuration import kafka_consumer_thread
-
class Declarer(ABC):
""" EventProcessorManager holds running processor instances """
@@ -66,7 +64,7 @@ class ProcessorDispatcher(ABC):
processor = cls._running_instances[invocation_id]
active_threads = processor.active_threads()
processor.stop()
- active_threads[kafka_consumer_thread].join()
+ active_threads['kafka'].join()
del processor
cls._running_instances.pop(invocation_id)
diff --git a/streampipes-wrapper-python/streampipes/model/__init__.py b/streampipes-wrapper-python/streampipes/model/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/streampipes-wrapper-python/streampipes/helper.py b/streampipes-wrapper-python/streampipes/model/config_item.py
similarity index 62%
rename from streampipes-wrapper-python/streampipes/helper.py
rename to streampipes-wrapper-python/streampipes/model/config_item.py
index 44a6b44..8530db7 100644
--- a/streampipes-wrapper-python/streampipes/helper.py
+++ b/streampipes-wrapper-python/streampipes/model/config_item.py
@@ -14,15 +14,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-"""some helper methods"""
-import threading
+import json
-def threaded(func):
- def wrapper(*args, **kwargs):
- thread_name = args[0]._invocation_id
- thread = threading.Thread(target=func, args=args, kwargs=kwargs, name=thread_name)
- thread.start()
- return thread
+class ConfigItem(object):
+ def __init__(self):
+ self.value = None
+ self.value_type = None
+ self.description = None
+ self.configuration_scope = None
+ self.is_password = None
- return wrapper
\ No newline at end of file
+ def to_json(self):
+ d = {}
+ for k,v in self.__dict__.items():
+ elements = k.split('_')
+ camel_case = elements[0] + ''.join(x.title() for x in elements[1:])
+ d[camel_case] = v
+
+ return json.dumps(d)
\ No newline at end of file
diff --git a/streampipes-wrapper-python/streampipes/model/processor_config.py b/streampipes-wrapper-python/streampipes/model/processor_config.py
new file mode 100644
index 0000000..702e206
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/model/processor_config.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import os
+
+from streampipes.utils.register import ConsulUtils
+
+
+class Config(object):
+ def __init__(self, app_id: str) -> None:
+ self.logger = logging.getLogger(__name__)
+ self.app_id = app_id
+ self.host: str
+ self.port: int
+ self.service: str
+
+ def register(self, type: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+
+ if type is 'host':
+ self.host = self.__env_or_default(env_key, default)
+ elif type is 'port':
+ self.port = self.__env_or_default(env_key, default)
+ elif type is 'service':
+ self.service = self.__env_or_default(env_key, default)
+
+ ConsulUtils.register_config(self.app_id, env_key, default, description, configuration_scope, is_password)
+
+ @staticmethod
+ def __env_or_default(key, default):
+ if key is not None:
+ if os.getenv(key):
+ return os.getenv(key)
+ else:
+ return default
+ else:
+ return default
\ No newline at end of file
diff --git a/streampipes-wrapper-python/streampipes/utils/__init__.py b/streampipes-wrapper-python/streampipes/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/streampipes-wrapper-python/streampipes/utils/register.py b/streampipes-wrapper-python/streampipes/utils/register.py
new file mode 100644
index 0000000..117e102
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/utils/register.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+""""""
+import os
+import consul
+from consul import Check
+
+from streampipes.model.config_item import ConfigItem
+
+
+class ConsulUtils(object):
+ _DEFAULT_CONSUL_CONFIG = {
+ 'CONSUL_HOST': 'consul',
+ 'CONSUL_PORT': 8500,
+ 'HEALTHCHECK_INTERVAL': '10s',
+ 'CONSUL_BASIC_ROUTE': 'sp/v1/'
+ }
+
+ if os.getenv('CONSUL_LOCATION'):
+ consul = consul.Consul(host=os.getenv('CONSUL_LOCATION'), port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+ elif os.getenv('SP_DEBUG'):
+ consul = consul.Consul(host='localhost', port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+ else:
+ consul = consul.Consul(host=_DEFAULT_CONSUL_CONFIG['CONSUL_HOST'], port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+
+ @classmethod
+ def register_service(cls, app_id: str, host: str, port: int):
+ # TODO: add service tags
+ cls.consul.agent.service.register(name='pe',
+ service_id=app_id,
+ address=host,
+ port=port,
+ tags=['pe', 'python', app_id],
+ check=Check.http(url='http://' + host + ':' + str(port),
+ interval=cls._DEFAULT_CONSUL_CONFIG['HEALTHCHECK_INTERVAL']))
+
+ @classmethod
+ def register_config(cls, app_id: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+ config_item = ConfigItem()
+
+ if env_key is not None:
+ if os.getenv(env_key):
+ env_value = os.getenv(env_key)
+ config_item.value = env_value
+ config_item.value_type = cls.__check_default_type(env_value)
+ else:
+ config_item.value = default
+ config_item.value_type = cls.__check_default_type(default)
+ else:
+ config_item.value = default
+ config_item.value_type = cls.__check_default_type(default)
+
+ config_item.description = description
+
+ if configuration_scope is not None:
+ config_item.configuration_scope = configuration_scope
+ else:
+ # TODO: configuration_scope needed? Currently manually set
+ config_item.configuration_scope = 'CONTAINER_STARTUP_CONFIG'
+
+ if is_password is not None:
+ config_item.is_password = is_password
+ else:
+ config_item.is_password = False
+
+ key = cls.__get_consul_key(app_id, env_key)
+ data = cls.consul.kv.get(key)
+ # TODO: update existing keys?
+ if data is None:
+ cls.consul.kv.put(key, config_item.to_json())
+
+ @classmethod
+ def __get_consul_key(cls, app_id, key):
+ return cls._DEFAULT_CONSUL_CONFIG['CONSUL_BASIC_ROUTE'] + app_id + '/' + key
+
+ @staticmethod
+ def __env_or_default(key, default):
+ if key is not None:
+ if os.getenv(key):
+ return os.getenv(key)
+ else:
+ return default
+ else:
+ return default
+
+ @staticmethod
+ def __check_default_type(value) -> str:
+ if isinstance(value, int):
+ return 'xs:integer'
+ elif isinstance(value, str):
+ return 'xs:string'
+ elif isinstance(value, bool):
+ return 'xs:boolean'
+ elif isinstance(value, float):
+ return 'xs:float'