You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/08/01 22:03:42 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-185] add consul service and config registration in python wrapper

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8478a5f  [STREAMPIPES-185] add consul service and config registration in python wrapper
8478a5f is described below

commit 8478a5fd823f0186c826760426e4204b2b17027b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sun Aug 2 00:03:25 2020 +0200

    [STREAMPIPES-185] add consul service and config registration in python wrapper
---
 streampipes-wrapper-python/requirements.txt        |   2 +
 streampipes-wrapper-python/setup.py                |   6 +-
 streampipes-wrapper-python/streampipes/api.py      |  25 +++--
 .../streampipes/{configuration.py => banner.py}    |   9 +-
 streampipes-wrapper-python/streampipes/core.py     |  41 ++++----
 streampipes-wrapper-python/streampipes/manager.py  |   4 +-
 .../streampipes/model/__init__.py                  |   0
 .../{helper.py => model/config_item.py}            |  25 +++--
 .../streampipes/model/processor_config.py          |  50 ++++++++++
 .../streampipes/utils/__init__.py                  |   0
 .../streampipes/utils/register.py                  | 109 +++++++++++++++++++++
 11 files changed, 223 insertions(+), 48 deletions(-)

diff --git a/streampipes-wrapper-python/requirements.txt b/streampipes-wrapper-python/requirements.txt
index 1008c86..27e0195 100644
--- a/streampipes-wrapper-python/requirements.txt
+++ b/streampipes-wrapper-python/requirements.txt
@@ -6,3 +6,5 @@ Jinja2==2.11.2
 MarkupSafe==1.1.1
 waitress==1.4.4
 Werkzeug==1.0.1
+
+setuptools~=49.2.0
\ No newline at end of file
diff --git a/streampipes-wrapper-python/setup.py b/streampipes-wrapper-python/setup.py
index dcef38c..63b792f 100644
--- a/streampipes-wrapper-python/setup.py
+++ b/streampipes-wrapper-python/setup.py
@@ -16,7 +16,7 @@
 #
 """"""
 
-from setuptools import setup
+from setuptools import setup, find_packages
 
 import io
 import os
@@ -28,7 +28,8 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') a
 setup(
     name='apache-streampipes-python',
     version='0.67.0-SNAPSHOT',
-    packages=["streampipes"],
+    #packages=["streampipes"],
+    packages=find_packages(),
     url='https://github.com/apache/incubator-streampipes',
     license='https://www.apache.org/licenses/LICENSE-2.0',
     license_files=["LICENSE", "NOTICE"],
@@ -41,6 +42,7 @@ setup(
         'confluent-kafka==1.4.2',
         'Flask==1.1.2',
         'waitress==1.4.4',
+        'python-consul==1.1.0'
     ],
     tests_require=[],
     python_requires='>=3.5',
diff --git a/streampipes-wrapper-python/streampipes/api.py b/streampipes-wrapper-python/streampipes/api.py
index f6baf08..36f3735 100644
--- a/streampipes-wrapper-python/streampipes/api.py
+++ b/streampipes-wrapper-python/streampipes/api.py
@@ -15,10 +15,11 @@
 # limitations under the License.
 #
 """ API endpoints """
-from flask import request, jsonify, Flask
+import json
+
+from flask import request, jsonify, Flask, Response, make_response
 from waitress import serve
 
-from streampipes.configuration import flask_host, port
 from streampipes.manager import ProcessorDispatcher
 
 
@@ -28,23 +29,29 @@ class EndpointAction(object):
 
     def __call__(self, *args, **kwargs):
         """ call corresponding handler method """
-        response = self.action(**request.get_json())
-        return jsonify(response)
+        if request.get_json() is not None:
+            resp = self.action(**request.get_json())
+        else:
+            resp = self.action()
+
+        return make_response(jsonify(resp), 200)
 
 
 class API(object):
     """ EventProcessorAPI contains relevant RESTful endpoints to start and stop """
     app = None
 
-    def __init__(self, name='python-processor'):
-        self.app = Flask(name)
+    def __init__(self, port: int):
+        self.app = Flask('python-processor')
+        self.port = port
         self.add_endpoints()
 
     def run(self):
-        serve(self.app, host=flask_host, port=port)
+        serve(self.app, host='0.0.0.0', port=self.port)
 
     def add_endpoints(self):
         """ define and add event processor API endpoints """
+        self.add_endpoint(endpoint='/', endpoint_name='/', methods=['GET'], handler=self.welcome)
         self.add_endpoint(endpoint='/invoke', endpoint_name='/invoke', methods=['POST'], handler=self.invoke)
         self.add_endpoint(endpoint='/detach', endpoint_name='/detach', methods=['POST'], handler=self.detach)
 
@@ -52,6 +59,10 @@ class API(object):
         self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler), methods=methods)
 
     @staticmethod
+    def welcome():
+        return {'welcome': 'hello-world!'}
+
+    @staticmethod
     def invoke(**kwargs):
         """ Receives invocation graph from pipeline management in the backend
 
diff --git a/streampipes-wrapper-python/streampipes/configuration.py b/streampipes-wrapper-python/streampipes/banner.py
similarity index 87%
rename from streampipes-wrapper-python/streampipes/configuration.py
rename to streampipes-wrapper-python/streampipes/banner.py
index 719b6e1..5bb8e75 100644
--- a/streampipes-wrapper-python/streampipes/configuration.py
+++ b/streampipes-wrapper-python/streampipes/banner.py
@@ -14,14 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-"""config file"""
-import os
+"""banner"""
 
-flask_host = os.getenv('FLASK_HOST', '0.0.0.0')
-port = os.getenv('SP_PORT', 5000)
-kafka_consumer_thread = 'kafka_consumer_thread'
-
-banner="""\
+banner = """\
  _______ __                              ______ __
 |     __|  |_.----.-----.---.-.--------.|   __ \__|.-----.-----.-----.
 |__     |   _|   _|  -__|  _  |        ||    __/  ||  _  |  -__|__ --|
diff --git a/streampipes-wrapper-python/streampipes/core.py b/streampipes-wrapper-python/streampipes/core.py
index c31cd9e..08bb665 100644
--- a/streampipes-wrapper-python/streampipes/core.py
+++ b/streampipes-wrapper-python/streampipes/core.py
@@ -17,25 +17,24 @@
 """contains relevant base classes"""
 import json
 import logging
+import threading
 from abc import ABC, abstractmethod
 from confluent_kafka.admin import AdminClient
-
-from streampipes.api import API
-from streampipes.configuration import banner, kafka_consumer_thread
-from streampipes.helper import threaded
 from confluent_kafka import Producer, Consumer
+from streampipes.api import API
+from streampipes.banner import banner
+from streampipes.model.processor_config import Config
+from streampipes.utils.register import ConsulUtils
 
 
-class StandaloneSubmitter(ABC):
-    @classmethod
-    def init(cls):
-        cls._load_banner()
-        cls.api = API()
-        cls.api.run()
-
+class StandaloneModelSubmitter(ABC):
     @classmethod
-    def _load_banner(cls):
+    def init(cls, config: Config):
         print(banner)
+        api = API(port=config.port)
+        threading.Thread(target=api.run()).start()
+
+        ConsulUtils.register_service(app_id=config.app_id, host=config.host, port=int(config.port))
 
 
 class EventProcessor(ABC):
@@ -82,7 +81,9 @@ class EventProcessor(ABC):
 
     def init(self):
         self.logger.info('start processor {}'.format(self.invocation_id))
-        self._threads[kafka_consumer_thread] = self._consume(self._input_topics)
+        thread = threading.Thread(target=self.__consume, name=self.invocation_id)
+        thread.start()
+        self._threads['kafka'] = thread
 
     def active_threads(self):
         return self._threads
@@ -108,16 +109,16 @@ class EventProcessor(ABC):
         """ on_detach is called when processor is stopped """
         pass
 
-    def _on_event(self, event):
+    def __on_event(self, event):
         result = self.on_event(event)
 
         if result is not None:
-            self._produce(result)
+            self.__produce(result)
 
-    @threaded
-    def _consume(self, topics):
+    #@threaded
+    def __consume(self):
         """ retrieve events from kafka """
-        self._consumer.subscribe(topics=[topics])
+        self._consumer.subscribe(topics=[self._input_topics])
         self._running = True
 
         while self._running:
@@ -141,9 +142,9 @@ class EventProcessor(ABC):
                     self.logger.info("Not a valid json {}".format(e))
                     continue
 
-                self._on_event(event)
+                self.__on_event(event)
 
-    def _produce(self, result):
+    def __produce(self, result):
         """ send events to kafka """
         event = json.dumps(result).encode('utf-8')
         try:
diff --git a/streampipes-wrapper-python/streampipes/manager.py b/streampipes-wrapper-python/streampipes/manager.py
index 64a2a4e..e151e8d 100644
--- a/streampipes-wrapper-python/streampipes/manager.py
+++ b/streampipes-wrapper-python/streampipes/manager.py
@@ -18,8 +18,6 @@
 import logging
 from abc import ABC
 
-from streampipes.configuration import kafka_consumer_thread
-
 
 class Declarer(ABC):
     """ EventProcessorManager holds running processor instances """
@@ -66,7 +64,7 @@ class ProcessorDispatcher(ABC):
             processor = cls._running_instances[invocation_id]
             active_threads = processor.active_threads()
             processor.stop()
-            active_threads[kafka_consumer_thread].join()
+            active_threads['kafka'].join()
 
             del processor
             cls._running_instances.pop(invocation_id)
diff --git a/streampipes-wrapper-python/streampipes/model/__init__.py b/streampipes-wrapper-python/streampipes/model/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/streampipes-wrapper-python/streampipes/helper.py b/streampipes-wrapper-python/streampipes/model/config_item.py
similarity index 62%
rename from streampipes-wrapper-python/streampipes/helper.py
rename to streampipes-wrapper-python/streampipes/model/config_item.py
index 44a6b44..8530db7 100644
--- a/streampipes-wrapper-python/streampipes/helper.py
+++ b/streampipes-wrapper-python/streampipes/model/config_item.py
@@ -14,15 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-"""some helper methods"""
-import threading
+import json
 
 
-def threaded(func):
-    def wrapper(*args, **kwargs):
-        thread_name = args[0]._invocation_id
-        thread = threading.Thread(target=func, args=args, kwargs=kwargs, name=thread_name)
-        thread.start()
-        return thread
+class ConfigItem(object):
+    def __init__(self):
+        self.value = None
+        self.value_type = None
+        self.description = None
+        self.configuration_scope = None
+        self.is_password = None
 
-    return wrapper
\ No newline at end of file
+    def to_json(self):
+        d = {}
+        for k,v in self.__dict__.items():
+            elements = k.split('_')
+            camel_case = elements[0] + ''.join(x.title() for x in elements[1:])
+            d[camel_case] = v
+
+        return json.dumps(d)
\ No newline at end of file
diff --git a/streampipes-wrapper-python/streampipes/model/processor_config.py b/streampipes-wrapper-python/streampipes/model/processor_config.py
new file mode 100644
index 0000000..702e206
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/model/processor_config.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import os
+
+from streampipes.utils.register import ConsulUtils
+
+
+class Config(object):
+    def __init__(self, app_id: str) -> None:
+        self.logger = logging.getLogger(__name__)
+        self.app_id = app_id
+        self.host: str
+        self.port: int
+        self.service: str
+
+    def register(self, type: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+
+        if type is 'host':
+            self.host = self.__env_or_default(env_key, default)
+        elif type is 'port':
+            self.port = self.__env_or_default(env_key, default)
+        elif type is 'service':
+            self.service = self.__env_or_default(env_key, default)
+
+        ConsulUtils.register_config(self.app_id, env_key, default, description, configuration_scope, is_password)
+
+    @staticmethod
+    def __env_or_default(key, default):
+        if key is not None:
+            if os.getenv(key):
+                return os.getenv(key)
+            else:
+                return default
+        else:
+            return default
\ No newline at end of file
diff --git a/streampipes-wrapper-python/streampipes/utils/__init__.py b/streampipes-wrapper-python/streampipes/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/streampipes-wrapper-python/streampipes/utils/register.py b/streampipes-wrapper-python/streampipes/utils/register.py
new file mode 100644
index 0000000..117e102
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/utils/register.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+""""""
+import os
+import consul
+from consul import Check
+
+from streampipes.model.config_item import ConfigItem
+
+
+class ConsulUtils(object):
+    _DEFAULT_CONSUL_CONFIG = {
+        'CONSUL_HOST': 'consul',
+        'CONSUL_PORT': 8500,
+        'HEALTHCHECK_INTERVAL': '10s',
+        'CONSUL_BASIC_ROUTE': 'sp/v1/'
+    }
+
+    if os.getenv('CONSUL_LOCATION'):
+        consul = consul.Consul(host=os.getenv('CONSUL_LOCATION'), port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+    elif os.getenv('SP_DEBUG'):
+        consul = consul.Consul(host='localhost', port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+    else:
+        consul = consul.Consul(host=_DEFAULT_CONSUL_CONFIG['CONSUL_HOST'], port=_DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])
+
+    @classmethod
+    def register_service(cls, app_id: str, host: str, port: int):
+        # TODO: add service tags
+        cls.consul.agent.service.register(name='pe',
+                                          service_id=app_id,
+                                          address=host,
+                                          port=port,
+                                          tags=['pe', 'python', app_id],
+                                          check=Check.http(url='http://' + host + ':' + str(port),
+                                                           interval=cls._DEFAULT_CONSUL_CONFIG['HEALTHCHECK_INTERVAL']))
+
+    @classmethod
+    def register_config(cls, app_id: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+        config_item = ConfigItem()
+
+        if env_key is not None:
+            if os.getenv(env_key):
+                env_value = os.getenv(env_key)
+                config_item.value = env_value
+                config_item.value_type = cls.__check_default_type(env_value)
+            else:
+                config_item.value = default
+                config_item.value_type = cls.__check_default_type(default)
+        else:
+            config_item.value = default
+            config_item.value_type = cls.__check_default_type(default)
+
+        config_item.description = description
+
+        if configuration_scope is not None:
+            config_item.configuration_scope = configuration_scope
+        else:
+            # TODO: configuration_scope needed? Currently manually set
+            config_item.configuration_scope = 'CONTAINER_STARTUP_CONFIG'
+
+        if is_password is not None:
+            config_item.is_password = is_password
+        else:
+            config_item.is_password = False
+
+        key = cls.__get_consul_key(app_id, env_key)
+        data = cls.consul.kv.get(key)
+        # TODO: update existing keys?
+        if data is None:
+            cls.consul.kv.put(key, config_item.to_json())
+
+    @classmethod
+    def __get_consul_key(cls, app_id, key):
+        return cls._DEFAULT_CONSUL_CONFIG['CONSUL_BASIC_ROUTE'] + app_id + '/' + key
+
+    @staticmethod
+    def __env_or_default(key, default):
+        if key is not None:
+            if os.getenv(key):
+                return os.getenv(key)
+            else:
+                return default
+        else:
+            return default
+
+    @staticmethod
+    def __check_default_type(value) -> str:
+        if isinstance(value, int):
+            return 'xs:integer'
+        elif isinstance(value, str):
+            return 'xs:string'
+        elif isinstance(value, bool):
+            return 'xs:boolean'
+        elif isinstance(value, float):
+            return 'xs:float'