You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/08/03 19:56:05 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-187]
defined api endpoint stubs
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new bcfbfdb [STREAMPIPES-187] defined api endpoint stubs
bcfbfdb is described below
commit bcfbfdb9578bb07b62fe4af123f20bc5246a5cda
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Aug 3 21:55:41 2020 +0200
[STREAMPIPES-187] defined api endpoint stubs
---
streampipes-wrapper-python/requirements.txt | 13 ++-
streampipes-wrapper-python/setup.py | 4 +-
.../streampipes/api/resources/base.py | 128 +++++++++++++++++++++
.../api/resources/{welcome.py => decorator.py} | 25 ++--
.../api/resources/{sepa.py => processor.py} | 23 ++--
.../streampipes/api/resources/welcome.py | 24 ++--
streampipes-wrapper-python/streampipes/api/rest.py | 29 +++--
streampipes-wrapper-python/streampipes/core.py | 1 -
.../streampipes/model/pipeline_element_config.py | 30 ++---
.../streampipes/utils/register.py | 37 ++++--
10 files changed, 234 insertions(+), 80 deletions(-)
diff --git a/streampipes-wrapper-python/requirements.txt b/streampipes-wrapper-python/requirements.txt
index 27e0195..7ae7bd7 100644
--- a/streampipes-wrapper-python/requirements.txt
+++ b/streampipes-wrapper-python/requirements.txt
@@ -1,10 +1,17 @@
+bjoern==3.1.0
+certifi==2020.6.20
+chardet==3.0.4
click==7.1.2
confluent-kafka==1.4.2
Flask==1.1.2
+Flask-Classful==0.14.2
+idna==2.10
itsdangerous==1.1.0
Jinja2==2.11.2
MarkupSafe==1.1.1
+python-consul==1.1.0
+requests==2.24.0
+six==1.15.0
+urllib3==1.25.10
waitress==1.4.4
-Werkzeug==1.0.1
-
-setuptools~=49.2.0
\ No newline at end of file
+Werkzeug==1.0.1
\ No newline at end of file
diff --git a/streampipes-wrapper-python/setup.py b/streampipes-wrapper-python/setup.py
index 63b792f..ea83329 100644
--- a/streampipes-wrapper-python/setup.py
+++ b/streampipes-wrapper-python/setup.py
@@ -28,7 +28,6 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') a
setup(
name='apache-streampipes-python',
version='0.67.0-SNAPSHOT',
- #packages=["streampipes"],
packages=find_packages(),
url='https://github.com/apache/incubator-streampipes',
license='https://www.apache.org/licenses/LICENSE-2.0',
@@ -41,7 +40,8 @@ setup(
install_requires=[
'confluent-kafka==1.4.2',
'Flask==1.1.2',
- 'waitress==1.4.4',
+ 'flask-classful==0.14.2',
+ 'bjoern==3.1.0',
'python-consul==1.1.0'
],
tests_require=[],
diff --git a/streampipes-wrapper-python/streampipes/api/resources/base.py b/streampipes-wrapper-python/streampipes/api/resources/base.py
new file mode 100644
index 0000000..8faeec1
--- /dev/null
+++ b/streampipes-wrapper-python/streampipes/api/resources/base.py
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import abc
+import os
+from flask import jsonify, make_response
+from flask_classful import FlaskView, route
+from streampipes.api.resources.decorator import consumes
+
+
+class Element(FlaskView):
+ __metaclass__ = abc.ABC
+
+ @route('/<element_id>', methods=['GET'])
+ def get_description(self, element_id: str):
+ # TODO: get element description
+ # TODO: get element from declarer
+ resp = {'element_id': element_id, 'description': 'dummy description'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/assets', methods=['GET'])
+ def get_assets(self, element_id: str):
+ # TODO: send zipped asset
+ resp = {'element_id': element_id, 'asset': 'dummy asset'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/assets/icon', methods=['GET'])
+ def get_assets_icon(self, element_id: str):
+ # TODO: return icon as byte array
+ resp = {'element_id': element_id, 'asset': 'dummy asset', 'icon': 'dummy icon'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/assets/documentation', methods=['GET'])
+ def get_assets_docs(self, element_id: str):
+ # TODO: return icon as byte array
+ resp = {'element_id': element_id, 'asset': 'dummy asset', 'docs': 'dummy docs'}
+ return make_response(jsonify(resp), 200)
+
+ @abc.abstractmethod
+ def get_element_declarers(self):
+ raise NotImplementedError()
+
+ def get_declarer_by_id(self, element_id: str):
+ return self.get_element_declarers().get(element_id=element_id)
+
+ @classmethod
+ def _get_json_ld(cls):
+ pass
+
+ @classmethod
+ def _make_grounding(cls):
+ pass
+
+ @classmethod
+ def _make_icon_path(cls, element_id: str):
+ return cls._make_path(element_id, 'icon.png')
+
+ @classmethod
+ def _make_documentation_path(cls, element_id: str):
+ return cls._make_path(element_id, 'documentation.md')
+
+ @classmethod
+ def _make_path(cls, element_id: str, asset_appendix: str):
+ return element_id + '/' + asset_appendix
+
+
+class InvocableElement(Element):
+ __metaclass__ = abc.ABC
+
+ @route('/<element_id>', methods=['POST'])
+ @consumes(media_type=['application/json'])
+ def invoke_runtime(self, element_id: str):
+ # TODO: parse invocation graph
+ # payload = request.json
+ resp = {'element_id': element_id, 'status': 'sucess'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/configurations', methods=['POST'])
+ @consumes(media_type=['application/json'])
+ def fetch_configurations(self, element_id: str):
+ # payload = request.json
+ resp = {'element_id': element_id, 'config': 'sucess'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/output', methods=['POST'])
+ @consumes(media_type=['application/json'])
+ def fetch_output_configurations(self, element_id: str):
+ # payload = request.json
+ resp = {'element_id': element_id, 'output': 'sucess'}
+ return make_response(jsonify(resp), 200)
+
+ @route('/<element_id>/<running_instance_id>', methods=['DELETE'])
+ def detach(self, element_id: str, running_instance_id: str):
+ resp = {'element_id': element_id, 'running_instance_id': running_instance_id}
+ return make_response(jsonify(resp), 200)
+
+ @abc.abstractmethod
+ def get_element_declarers(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_instance_id(self, uri: str, element_id: str):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_extractor(self, graph):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_grounding_debug_information(self, graph):
+ raise NotImplementedError()
+
+ @staticmethod
+ def _is_debug():
+ return True if os.getenv('SP_DEBUG') == 'true' else False
diff --git a/streampipes-wrapper-python/streampipes/api/resources/welcome.py b/streampipes-wrapper-python/streampipes/api/resources/decorator.py
similarity index 65%
copy from streampipes-wrapper-python/streampipes/api/resources/welcome.py
copy to streampipes-wrapper-python/streampipes/api/resources/decorator.py
index 227e939..2b805a7 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/welcome.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/decorator.py
@@ -14,18 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from flask import Blueprint, make_response, jsonify, request
+import functools
+from flask import request, abort
-welcome_blueprint = Blueprint('welcome', __name__)
-
-
-@welcome_blueprint.route('/', methods=['GET'])
-def get_welcome_page_html():
-
- if request.content_type == 'application/json':
- resp = {'welcome': 'hello-world!'}
- return make_response(jsonify(resp), 200)
-
- elif request.content_type == 'text/html':
- return "<p>Got it!</p>"
+def consumes(media_type):
+ def consumes_decorator(f):
+ @functools.wraps(f)
+ def decorated_function(*args, **kwargs):
+ for entry in media_type:
+ if request.content_type == entry:
+ return f(*args, **kwargs)
+ abort(404)
+ return decorated_function
+ return consumes_decorator
diff --git a/streampipes-wrapper-python/streampipes/api/resources/sepa.py b/streampipes-wrapper-python/streampipes/api/resources/processor.py
similarity index 62%
rename from streampipes-wrapper-python/streampipes/api/resources/sepa.py
rename to streampipes-wrapper-python/streampipes/api/resources/processor.py
index f7eeefb..14602f6 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/sepa.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/processor.py
@@ -14,19 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from flask import Blueprint, request, make_response, jsonify
-from streampipes.manager import ProcessorDispatcher
+from streampipes.api.resources.base import InvocableElement
-sepa_blueprint = Blueprint('sepa', __name__)
+class SepaElementResource(InvocableElement):
-@sepa_blueprint.route('/invoke', methods=['POST'])
-def invoke():
- resp = ProcessorDispatcher.start(**request.json)
- return make_response(jsonify(resp), 200)
+ def get_instance_id(self, uri: str, element_id: str):
+ pass
+ def get_element_declarers(self):
+ pass
+
+ def get_extractor(self, graph):
+ pass
+
+ def create_grounding_debug_information(self, graph):
+ pass
-@sepa_blueprint.route('/detach', methods=['POST'])
-def detach():
- resp = ProcessorDispatcher.stop(**request.json)
- return make_response(jsonify(resp), 200)
diff --git a/streampipes-wrapper-python/streampipes/api/resources/welcome.py b/streampipes-wrapper-python/streampipes/api/resources/welcome.py
index 227e939..3f531d7 100644
--- a/streampipes-wrapper-python/streampipes/api/resources/welcome.py
+++ b/streampipes-wrapper-python/streampipes/api/resources/welcome.py
@@ -14,18 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from flask import Blueprint, make_response, jsonify, request
+from flask import make_response, jsonify, request
+from flask_classful import FlaskView
+from streampipes.api.resources.decorator import consumes
-welcome_blueprint = Blueprint('welcome', __name__)
+class WelcomeResource(FlaskView):
-@welcome_blueprint.route('/', methods=['GET'])
-def get_welcome_page_html():
-
- if request.content_type == 'application/json':
- resp = {'welcome': 'hello-world!'}
- return make_response(jsonify(resp), 200)
-
- elif request.content_type == 'text/html':
- return "<p>Got it!</p>"
-
+ @consumes(media_type=['application/json','text/html'])
+ def index(self):
+ if request.content_type == 'application/json':
+ resp = {'welcome': 'hello-world!'}
+ return make_response(jsonify(resp), 200)
+ elif request.content_type == 'text/html':
+ return "<p>Got it!</p>"
+ return 'welcome!'
diff --git a/streampipes-wrapper-python/streampipes/api/rest.py b/streampipes-wrapper-python/streampipes/api/rest.py
index 702011e..8d15b8d 100644
--- a/streampipes-wrapper-python/streampipes/api/rest.py
+++ b/streampipes-wrapper-python/streampipes/api/rest.py
@@ -15,32 +15,31 @@
# limitations under the License.
#
""" API endpoints """
-import sys
import threading
+import bjoern
from flask import Flask
-from waitress import serve
-
-
-class FlaskProductionConfig(object):
- DEBUG = False
- DEVELOPMENT = False
+from streampipes.api.resources.processor import SepaElementResource
+from streampipes.api.resources.welcome import WelcomeResource
class PipelineElementApi(object):
+ _FLASK_CONFIG = {
+ 'DEBUG': False,
+ 'DEVELOPMENT': False
+ }
def __init__(self):
self.app = Flask(__name__, instance_relative_config=False)
- self.app.config.from_object(FlaskProductionConfig)
+ self.app.config.from_object(self._FLASK_CONFIG)
with self.app.app_context():
- # import endpoints
- from streampipes.api.resources import welcome
- from streampipes.api.resources import sepa
- # register blueprints
- self.app.register_blueprint(welcome.welcome_blueprint)
- self.app.register_blueprint(sepa.sepa_blueprint)
+ # register resources
+ SepaElementResource.register(self.app, route_base='/', route_prefix='sepa')
+ WelcomeResource.register(self.app, route_base='/')
def run(self, port: int):
- threading.Thread(target=serve, kwargs={'app': self.app, 'host': '0.0.0.0', 'port': port}).start()
+ print('serving API via bjoern WSGI server ... {}:{}'.format('0.0.0.0', port))
+ threading.Thread(target=bjoern.run, args=(self.app,), kwargs={'host': '0.0.0.0', 'port': int(port)}).start()
+
diff --git a/streampipes-wrapper-python/streampipes/core.py b/streampipes-wrapper-python/streampipes/core.py
index 4fd2f4d..f8140a3 100644
--- a/streampipes-wrapper-python/streampipes/core.py
+++ b/streampipes-wrapper-python/streampipes/core.py
@@ -21,7 +21,6 @@ import threading
from abc import ABC, abstractmethod
from confluent_kafka.admin import AdminClient
from confluent_kafka import Producer, Consumer
-
from streampipes.api.rest import PipelineElementApi
from streampipes.base.banner import banner
from streampipes.model.pipeline_element_config import Config
diff --git a/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py b/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
index af0a40e..0976967 100644
--- a/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
+++ b/streampipes-wrapper-python/streampipes/model/pipeline_element_config.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import logging
import os
from streampipes.utils.register import ConsulUtils
@@ -22,29 +21,34 @@ from streampipes.utils.register import ConsulUtils
class Config(object):
def __init__(self, app_id: str) -> None:
- self.logger = logging.getLogger(__name__)
+ if not app_id:
+ raise ValueError
+
self.app_id = app_id
- self.host: str
- self.port: int
- self.service: str
+ self.host: str = ""
+ self.port: int = 0
+ self.service: str = ""
def register(self, type: str, env_key: str, default, description: str, configuration_scope=None, is_password=None):
+ if not type:
+ raise ValueError
+ if not env_key:
+ raise ValueError
+ if not default:
+ raise ValueError
if type is 'host':
- self.host = self.__env_or_default(env_key, default)
+ self.host = self._env_or_default(env_key, default)
elif type is 'port':
- self.port = self.__env_or_default(env_key, default)
+ self.port = self._env_or_default(env_key, default)
elif type is 'service':
- self.service = self.__env_or_default(env_key, default)
+ self.service = self._env_or_default(env_key, default)
ConsulUtils().register_config(self.app_id, env_key, default, description, configuration_scope, is_password)
@staticmethod
- def __env_or_default(key, default):
+ def _env_or_default(key, default):
if key is not None:
if os.getenv(key):
return os.getenv(key)
- else:
- return default
- else:
- return default
+ return default
diff --git a/streampipes-wrapper-python/streampipes/utils/register.py b/streampipes-wrapper-python/streampipes/utils/register.py
index 5ab3242..01db4d6 100644
--- a/streampipes-wrapper-python/streampipes/utils/register.py
+++ b/streampipes-wrapper-python/streampipes/utils/register.py
@@ -18,7 +18,6 @@
import os
import consul
from consul import Check
-
from streampipes.model.config_item import ConfigItem
@@ -31,9 +30,17 @@ class ConsulUtils(object):
}
def __init__(self):
- self.consul = self.__consul()
+ self.consul = self._consul()
def register_service(self, app_id: str, host: str, port: int):
+ if not app_id:
+ raise ValueError
+ if not host:
+ raise ValueError
+ if not port:
+ raise ValueError
+
+ print('register service at consul for key: {}'.format(app_id))
self.consul.agent.service.register(name='pe',
service_id=app_id,
address=host,
@@ -46,19 +53,28 @@ class ConsulUtils(object):
def register_config(self, app_id: str, env_key: str, default, description: str, configuration_scope=None,
is_password=None):
+ if not app_id:
+ raise ValueError
+ if not env_key:
+ raise ValueError
+ if not default:
+ raise ValueError
+ if not description:
+ raise ValueError
+
config_item = ConfigItem()
if env_key is not None:
if os.getenv(env_key):
env_value = os.getenv(env_key)
config_item.value = env_value
- config_item.value_type = self.__check_default_type(env_value)
+ config_item.value_type = self._check_default_type(env_value)
else:
config_item.value = default
- config_item.value_type = self.__check_default_type(default)
+ config_item.value_type = self._check_default_type(default)
else:
config_item.value = default
- config_item.value_type = self.__check_default_type(default)
+ config_item.value_type = self._check_default_type(default)
config_item.description = description
@@ -73,17 +89,18 @@ class ConsulUtils(object):
else:
config_item.is_password = False
- key = self.__get_consul_key(app_id, env_key)
+ key = self._get_consul_key(app_id, env_key)
index, data = self.consul.kv.get(key)
# TODO: update existing keys?
if data is None:
+ print('register config item at consul for key: {}'.format(env_key))
self.consul.kv.put(key, config_item.to_json())
- def __get_consul_key(self, app_id, key):
+ def _get_consul_key(self, app_id, key):
return self._DEFAULT_CONSUL_CONFIG['CONSUL_BASIC_ROUTE'] + app_id + '/' + key
@staticmethod
- def __env_or_default(key, default):
+ def _env_or_default(key, default):
if key is not None:
if os.getenv(key):
return os.getenv(key)
@@ -93,7 +110,7 @@ class ConsulUtils(object):
return default
@staticmethod
- def __check_default_type(value) -> str:
+ def _check_default_type(value) -> str:
if isinstance(value, int):
return 'xs:integer'
elif isinstance(value, str):
@@ -103,7 +120,7 @@ class ConsulUtils(object):
elif isinstance(value, float):
return 'xs:float'
- def __consul(self):
+ def _consul(self):
if os.getenv('CONSUL_LOCATION'):
return consul.Consul(host=os.getenv('CONSUL_LOCATION'),
port=self._DEFAULT_CONSUL_CONFIG['CONSUL_PORT'])