You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/08/03 19:56:05 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-187] defined api endpoint stubs

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new bcfbfdb  [STREAMPIPES-187] defined api endpoint stubs
bcfbfdb is described below

commit bcfbfdb9578bb07b62fe4af123f20bc5246a5cda
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Aug 3 21:55:41 2020 +0200

    [STREAMPIPES-187] defined api endpoint stubs
---
 streampipes-wrapper-python/requirements.txt        |  13 ++-
 streampipes-wrapper-python/setup.py                |   4 +-
 .../streampipes/api/resources/base.py              | 128 +++++++++++++++++++++
 .../api/resources/{welcome.py => decorator.py}     |  25 ++--
 .../api/resources/{sepa.py => processor.py}        |  23 ++--
 .../streampipes/api/resources/welcome.py           |  24 ++--
 streampipes-wrapper-python/streampipes/api/rest.py |  29 +++--
 streampipes-wrapper-python/streampipes/core.py     |   1 -
 .../streampipes/model/pipeline_element_config.py   |  30 ++---
 .../streampipes/utils/register.py                  |  37 ++++--
 10 files changed, 234 insertions(+), 80 deletions(-)

diff --git a/streampipes-wrapper-python/requirements.txt b/streampipes-wrapper-python/requirements.txt
index 27e0195..7ae7bd7 100644
--- a/streampipes-wrapper-python/requirements.txt
+++ b/streampipes-wrapper-python/requirements.txt
@@ -1,10 +1,17 @@
+bjoern==3.1.0
+certifi==2020.6.20
+chardet==3.0.4
 click==7.1.2
 confluent-kafka==1.4.2
 Flask==1.1.2
+Flask-Classful==0.14.2
+idna==2.10
 itsdangerous==1.1.0
 Jinja2==2.11.2
 MarkupSafe==1.1.1
+python-consul==1.1.0
+requests==2.24.0
+six==1.15.0
+urllib3==1.25.10
 waitress==1.4.4
-Werkzeug==1.0.1
-
-setuptools~=49.2.0
\ No newline at end of file
+Werkzeug==1.0.1
\ No newline at end of file
diff --git a/streampipes-wrapper-python/setup.py b/streampipes-wrapper-python/setup.py
index 63b792f..ea83329 100644
--- a/streampipes-wrapper-python/setup.py
+++ b/streampipes-wrapper-python/setup.py
@@ -28,7 +28,6 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') a
 setup(
     name='apache-streampipes-python',
     version='0.67.0-SNAPSHOT',
-    #packages=["streampipes"],
     packages=find_packages(),
     url='https://github.com/apache/incubator-streampipes',
     license='https://www.apache.org/licenses/LICENSE-2.0',
@@ -41,7 +40,8 @@ setup(
     install_requires=[
         'confluent-kafka==1.4.2',
         'Flask==1.1.2',
-        'waitress==1.4.4',
+        'flask-classful==0.14.2',
+        'bjoern==3.1.0',
         'python-consul==1.1.0'
     ],
     tests_require=[],
diff --git a/streampipes-wrapper-python/streampipes/api/resources/base.py b/streampipes-wrapper-python/streampipes/api/resources/base.py
new file mode 100644
index 0000000..8faeec1
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/api/resources/base.py
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import abc
+import os
+from flask import jsonify, make_response
+from flask_classful import FlaskView, route
+from streampipes.api.resources.decorator import consumes
+
+
+class Element(FlaskView):
+    __metaclass__ = abc.ABC
+
+    @route('/<element_id>', methods=['GET'])
+    def get_description(self, element_id: str):
+        # TODO: get element description
+        # TODO: get element from declarer
+        resp = {'element_id': element_id, 'description': 'dummy description'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/assets', methods=['GET'])
+    def get_assets(self, element_id: str):
+        # TODO: send zipped asset
+        resp = {'element_id': element_id, 'asset': 'dummy asset'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/assets/icon', methods=['GET'])
+    def get_assets_icon(self, element_id: str):
+        # TODO: return icon as byte array
+        resp = {'element_id': element_id, 'asset': 'dummy asset', 'icon': 'dummy icon'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/assets/documentation', methods=['GET'])
+    def get_assets_docs(self, element_id: str):
+        # TODO: return icon as byte array
+        resp = {'element_id': element_id, 'asset': 'dummy asset', 'docs': 'dummy docs'}
+        return make_response(jsonify(resp), 200)
+
+    @abc.abstractmethod
+    def get_element_declarers(self):
+        raise NotImplementedError()
+
+    def get_declarer_by_id(self, element_id: str):
+        return self.get_element_declarers().get(element_id=element_id)
+
+    @classmethod
+    def _get_json_ld(cls):
+        pass
+
+    @classmethod
+    def _make_grounding(cls):
+        pass
+
+    @classmethod
+    def _make_icon_path(cls, element_id: str):
+        return cls._make_path(element_id, 'icon.png')
+
+    @classmethod
+    def _make_documentation_path(cls, element_id: str):
+        return cls._make_path(element_id, 'documentation.md')
+
+    @classmethod
+    def _make_path(cls, element_id: str, asset_appendix: str):
+        return element_id + '/' + asset_appendix
+
+
+class InvocableElement(Element):
+    __metaclass__ = abc.ABC
+
+    @route('/<element_id>', methods=['POST'])
+    @consumes(media_type=['application/json'])
+    def invoke_runtime(self, element_id: str):
+        # TODO: parse invocation graph
+        # payload = request.json
+        resp = {'element_id': element_id, 'status': 'sucess'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/configurations', methods=['POST'])
+    @consumes(media_type=['application/json'])
+    def fetch_configurations(self, element_id: str):
+        # payload = request.json
+        resp = {'element_id': element_id, 'config': 'sucess'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/output', methods=['POST'])
+    @consumes(media_type=['application/json'])
+    def fetch_output_configurations(self, element_id: str):
+        # payload = request.json
+        resp = {'element_id': element_id, 'output': 'sucess'}
+        return make_response(jsonify(resp), 200)
+
+    @route('/<element_id>/<running_instance_id>', methods=['DELETE'])
+    def detach(self, element_id: str, running_instance_id: str):
+        resp = {'element_id': element_id, 'running_instance_id': running_instance_id}
+        return make_response(jsonify(resp), 200)
+
+    @abc.abstractmethod
+    def get_element_declarers(self):
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_instance_id(self, uri: str, element_id: str):
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_extractor(self, graph):
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def create_grounding_debug_information(self, graph):
+        raise NotImplementedError()
+
+    @staticmethod
+    def _is_debug():
+        return True if os.getenv('SP_DEBUG') == 'true' else False
diff --git a/streampipes-wrapper-python/streampipes/api/resources/welcome.py b/streampipes-wrapper-python/streampipes/api/resources/decorator.py
similarity index 65%
copy from streampipes-wrapper-python/streampipes/api/resources/welcome.py
copy to streampipes-wrapper-python/streampipes/api/resources/decorator.py
index 227e939..2b805a7 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/welcome.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/decorator.py
@@ -14,18 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from flask import Blueprint, make_response, jsonify, request
+import functools
+from flask import request, abort
 
-welcome_blueprint = Blueprint('welcome', __name__)
-
-
-@welcome_blueprint.route('/', methods=['GET'])
-def get_welcome_page_html():
-
-    if request.content_type == 'application/json':
-        resp = {'welcome': 'hello-world!'}
-        return make_response(jsonify(resp), 200)
-
-    elif request.content_type == 'text/html':
-        return "<p>Got it!</p>"
 
+def consumes(media_type):
+    def consumes_decorator(f):
+        @functools.wraps(f)
+        def decorated_function(*args, **kwargs):
+            for entry in media_type:
+                if request.content_type == entry:
+                    return f(*args, **kwargs)
+            abort(404)
+        return decorated_function
+    return consumes_decorator
diff --git a/streampipes-wrapper-python/streampipes/api/resources/sepa.py b/streampipes-wrapper-python/streampipes/api/resources/processor.py
similarity index 62%
rename from streampipes-wrapper-python/streampipes/api/resources/sepa.py
rename to streampipes-wrapper-python/streampipes/api/resources/processor.py
index f7eeefb..14602f6 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/sepa.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/processor.py
@@ -14,19 +14,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from flask import Blueprint, request, make_response, jsonify
-from streampipes.manager import ProcessorDispatcher
+from streampipes.api.resources.base import InvocableElement
 
-sepa_blueprint = Blueprint('sepa', __name__)
 
+class SepaElementResource(InvocableElement):
 
-@sepa_blueprint.route('/invoke', methods=['POST'])
-def invoke():
-    resp = ProcessorDispatcher.start(**request.json)
-    return make_response(jsonify(resp), 200)
+    def get_instance_id(self, uri: str, element_id: str):
+        pass
 
+    def get_element_declarers(self):
+        pass
+
+    def get_extractor(self, graph):
+        pass
+
+    def create_grounding_debug_information(self, graph):
+        pass
 
-@sepa_blueprint.route('/detach', methods=['POST'])
-def detach():
-    resp = ProcessorDispatcher.stop(**request.json)
-    return make_response(jsonify(resp), 200)
diff --git a/streampipes-wrapper-python/streampipes/api/resources/welcome.py b/streampipes-wrapper-python/streampipes/api/resources/welcome.py
index 227e939..3f531d7 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/welcome.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/welcome.py
@@ -14,18 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from flask import Blueprint, make_response, jsonify, request
+from flask import make_response, jsonify, request
+from flask_classful import FlaskView
+from streampipes.api.resources.decorator import consumes
 
-welcome_blueprint = Blueprint('welcome', __name__)
 
+class WelcomeResource(FlaskView):
 
-@welcome_blueprint.route('/', methods=['GET'])
-def get_welcome_page_html():
-
-    if request.content_type == 'application/json':
-        resp = {'welcome': 'hello-world!'}
-        return make_response(jsonify(resp), 200)
-
-    elif request.content_type == 'text/html':
-        return "<p>Got it!</p>"
-
+    @consumes(media_type=['application/json','text/html'])
+    def index(self):
+        if request.content_type == 'application/json':
+            resp = {'welcome': 'hello-world!'}
+            return make_response(jsonify(resp), 200)
+        elif request.content_type == 'text/html':
+            return "<p>Got it!</p>"
+        return 'welcome!'
diff --git a/streampipes-wrapper-python/streampipes/api/rest.py b/streampipes-wrapper-python/streampipes/api/rest.py
index 702011e..8d15b8d 100644
--- a/streampipes-wrapper-python/streampipes/api/rest.py
+++ b/streampipes-wrapper-python/streampipes/api/rest.py
@@ -15,32 +15,31 @@
 # limitations under the License.
 #
 """ API endpoints """
-import sys
 import threading
 
+import bjoern
 from flask import Flask
-from waitress import serve
-
-
-class FlaskProductionConfig(object):
-    DEBUG = False
-    DEVELOPMENT = False
+from streampipes.api.resources.processor import SepaElementResource
+from streampipes.api.resources.welcome import WelcomeResource
 
 
 class PipelineElementApi(object):
+    _FLASK_CONFIG = {
+        'DEBUG': False,
+        'DEVELOPMENT': False
+    }
 
     def __init__(self):
         self.app = Flask(__name__, instance_relative_config=False)
-        self.app.config.from_object(FlaskProductionConfig)
+        self.app.config.from_object(self._FLASK_CONFIG)
 
         with self.app.app_context():
-            # import endpoints
-            from streampipes.api.resources import welcome
-            from streampipes.api.resources import sepa
 
-            # register blueprints
-            self.app.register_blueprint(welcome.welcome_blueprint)
-            self.app.register_blueprint(sepa.sepa_blueprint)
+            # register resources
+            SepaElementResource.register(self.app, route_base='/', route_prefix='sepa')
+            WelcomeResource.register(self.app, route_base='/')
 
     def run(self, port: int):
-        threading.Thread(target=serve, kwargs={'app': self.app, 'host': '0.0.0.0', 'port': port}).start()
+        print('serving API via bjoern WSGI server ... {}:{}'.format('0.0.0.0', port))
+        threading.Thread(target=bjoern.run, args=(self.app,), kwargs={'host': '0.0.0.0', 'port': int(port)}).start()
+
diff --git a/streampipes-wrapper-python/streampipes/core.py b/streampipes-wrapper-python/streampipes/core.py
index 4fd2f4d..f8140a3 100644
--- a/streampipes-wrapper-python/streampipes/core.py
+++ b/streampipes-wrapper-python/streampipes/core.py
@@ -21,7 +21,6 @@ import threading
 from abc import ABC, abstractmethod
 from confluent_kafka.admin import AdminClient
 from confluent_kafka import Producer, Consumer
-
 from streampipes.api.rest import PipelineElementApi
 from streampipes.base.banner import banner
 from streampipes.model.pipeline_element_config import Config
diff --git a/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py b/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
index af0a40e..0976967 100644
--- a/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
+++ b/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import logging
 import os
 
 from streampipes.utils.register import ConsulUtils
@@ -22,29 +21,34 @@ from streampipes.utils.register import ConsulUtils
 
 class Config(object):
     def __init__(self, app_id: str) -> None:
-        self.logger = logging.getLogger(__name__)
+        if not app_id:
+            raise ValueError
+
         self.app_id = app_id
-        self.host: str
-        self.port: int
-        self.service: str
+        self.host: str = ""
+        self.port: int = 0
+        self.service: str = ""
 
     def register(self, type: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+        if not type:
+            raise ValueError
+        if not env_key:
+            raise ValueError
+        if not default:
+            raise ValueError
 
         if type is 'host':
-            self.host = self.__env_or_default(env_key, default)
+            self.host = self._env_or_default(env_key, default)
         elif type is 'port':
-            self.port = self.__env_or_default(env_key, default)
+            self.port = self._env_or_default(env_key, default)
         elif type is 'service':
-            self.service = self.__env_or_default(env_key, default)
+            self.service = self._env_or_default(env_key, default)
 
         ConsulUtils().register_config(self.app_id, env_key, default, description, configuration_scope, is_password)
 
     @staticmethod
-    def __env_or_default(key, default):
+    def _env_or_default(key, default):
         if key is not None:
             if os.getenv(key):
                 return os.getenv(key)
-            else:
-                return default
-        else:
-            return default
+        return default
diff --git a/streampipes-wrapper-python/streampipes/utils/register.py b/streampipes-wrapper-python/streampipes/utils/register.py
index 5ab3242..01db4d6 100644
--- a/streampipes-wrapper-python/streampipes/utils/register.py
+++ b/streampipes-wrapper-python/streampipes/utils/register.py
@@ -18,7 +18,6 @@
 import os
 import consul
 from consul import Check
-
 from streampipes.model.config_item import ConfigItem
 
 
@@ -31,9 +30,17 @@ class ConsulUtils(object):
     }
 
     def __init__(self):
-        self.consul = self.__consul()
+        self.consul = self._consul()
 
     def register_service(self, app_id: str, host: str, port: int):
+        if not app_id:
+            raise ValueError
+        if not host:
+            raise ValueError
+        if not port:
+            raise ValueError
+
+        print('register service at consul for key: {}'.format(app_id))
         self.consul.agent.service.register(name='pe',
                                            service_id=app_id,
                                            address=host,
@@ -46,19 +53,28 @@ class ConsulUtils(object):
 
     def register_config(self, app_id: str, env_key: str, default, description: str, configuration_scope=None,
                         is_password=None):
+        if not app_id:
+            raise ValueError
+        if not env_key:
+            raise ValueError
+        if not default:
+            raise ValueError
+        if not description:
+            raise ValueError
+
         config_item = ConfigItem()
 
         if env_key is not None:
             if os.getenv(env_key):
                 env_value = os.getenv(env_key)
                 config_item.value = env_value
-                config_item.value_type = self.__check_default_type(env_value)
+                config_item.value_type = self._check_default_type(env_value)
             else:
                 config_item.value = default
-                config_item.value_type = self.__check_default_type(default)
+                config_item.value_type = self._check_default_type(default)
         else:
             config_item.value = default
-            config_item.value_type = self.__check_default_type(default)
+            config_item.value_type = self._check_default_type(default)
 
         config_item.description = description
 
@@ -73,17 +89,18 @@ class ConsulUtils(object):
         else:
             config_item.is_password = False
 
-        key = self.__get_consul_key(app_id, env_key)
+        key = self._get_consul_key(app_id, env_key)
         index, data = self.consul.kv.get(key)
         # TODO: update existing keys?
         if data is None:
+            print('register config item at consul for key: {}'.format(env_key))
             self.consul.kv.put(key, config_item.to_json())
 
-    def __get_consul_key(self, app_id, key):
+    def _get_consul_key(self, app_id, key):
         return self._DEFAULT_CONSUL_CONFIG['CONSUL_BASIC_ROUTE'] + app_id + '/' + key
 
     @staticmethod
-    def __env_or_default(key, default):
+    def _env_or_default(key, default):
         if key is not None:
             if os.getenv(key):
                 return os.getenv(key)
@@ -93,7 +110,7 @@ class ConsulUtils(object):
             return default
 
     @staticmethod
-    def __check_default_type(value) -> str:
+    def _check_default_type(value) -> str:
         if isinstance(value, int):
             return 'xs:integer'
         elif isinstance(value, str):
@@ -103,7 +120,7 @@ class ConsulUtils(object):
         elif isinstance(value, float):
             return 'xs:float'
 
-    def __consul(self):
+    def _consul(self):
         if os.getenv('CONSUL_LOCATION'):
             return consul.Consul(host=os.getenv('CONSUL_LOCATION'),
                                  port=self._DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])