You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 16:56:41 UTC
[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Enable output streams for Kafka
This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
new f4d2f35da Enable output streams for Kafka
f4d2f35da is described below
commit f4d2f35da684cdedc1630ad43bf7671f6150a5ca
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 18:56:45 2023 +0200
Enable output streams for Kafka
---
...ine-learning-on-a-streampipes-data-stream.ipynb | 18 +++++++-----
.../streampipes/function_zoo/river_function.py | 8 ++++-
.../streampipes/functions/broker/broker_handler.py | 27 ++++++++++++++++-
.../functions/broker/kafka/kafka_publisher.py | 1 -
.../functions/utils/data_stream_generator.py | 34 ++++++++++++++++++----
5 files changed, 73 insertions(+), 15 deletions(-)
diff --git a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
index 8e6673ab4..780464d3d 100644
--- a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
@@ -23,14 +23,14 @@
{
"cell_type": "code",
"execution_count": null,
+ "metadata": {
+ "collapsed": false
+ },
"outputs": [],
"source": [
"# you can install all required dependecies for this tutorial by executing the following command\n",
"%pip install river streampipes"
- ],
- "metadata": {
- "collapsed": false
- }
+ ]
},
{
"cell_type": "code",
@@ -42,7 +42,10 @@
"\n",
"os.environ[\"USER\"] = \"admin@streampipes.apache.org\"\n",
"os.environ[\"API-KEY\"] = \"XXX\"\n",
- "os.environ[\"BROKER-HOST\"] = \"localhost\""
+ "\n",
+ "# Use this if you work locally:\n",
+ "os.environ[\"BROKER-HOST\"] = \"localhost\" \n",
+ "os.environ[\"KAFKA-PORT\"] = \"9094\" # When using Kafka as message broker"
]
},
{
@@ -324,7 +327,8 @@
"metadata": {},
"outputs": [],
"source": [
- "from river import cluster, compose, preprocessing, tree\n",
+ "import pickle\n",
+ "from river import compose, tree\n",
"from streampipes.function_zoo.river_function import OnlineML\n",
"from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
"\n",
@@ -393,7 +397,7 @@
"outputs": [],
"source": [
"import pickle\n",
- "from river import cluster, compose, preprocessing, tree\n",
+ "from river import compose, tree\n",
"from streampipes.function_zoo.river_function import OnlineML\n",
"from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
"\n",
diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index b43b40d0c..287f2715b 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -17,6 +17,7 @@
from typing import Any, Callable, Dict, List, Optional
from streampipes.client.client import StreamPipesClient
+from streampipes.functions.broker.broker_handler import get_broker_enum
from streampipes.functions.function_handler import FunctionHandler
from streampipes.functions.registration import Registration
from streampipes.functions.streampipes_function import StreamPipesFunction
@@ -187,7 +188,12 @@ class OnlineML:
attributes["truth"] = prediction_type
if target_label is None:
raise ValueError("You must define a target attribute for a supervised model.")
- output_stream = create_data_stream("prediction", attributes)
+
+ output_stream = create_data_stream(
+ name="prediction",
+ attributes=attributes,
+ broker=get_broker_enum(client.dataStreamApi.get(stream_ids[0])), # type: ignore
+ )
function_definition = FunctionDefinition().add_output_data_stream(output_stream)
self.sp_function = RiverFunction(
function_definition, stream_ids, model, supervised, target_label, on_start, on_event, on_stop
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 7cfe78f7c..1cdfc2dee 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -68,7 +68,32 @@ def get_broker(
return NatsConsumer()
elif SupportedBroker.KAFKA.value in broker_name:
if is_publisher:
- KafkaPublisher()
+ return KafkaPublisher()
return KafkaConsumer()
else:
raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
+
+
+def get_broker_enum(data_stream: DataStream) -> SupportedBroker:
+ """Derive the enum of the broker for the given data stream.
+
+ Parameters
+ ----------
+ data_stream: DataStream
+ Data stream instance from which the broker is inferred
+
+ Returns
+ -------
+ broker: SupportedBroker
+ The corresponding broker enum derived from data stream.
+
+ Raises
+ ------
+ UnsupportedBrokerError
+ Is raised when the given data stream belongs to a broker that is currently not supported by StreamPipes Python.
+ """
+ broker_name = data_stream.event_grounding.transport_protocols[0].class_name
+ for b in SupportedBroker:
+ if b.value in broker_name:
+ return b
+ raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 117fa2cc6..f55337b5a 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -69,5 +69,4 @@ class KafkaPublisher(Publisher):
-------
None
"""
- self.kafka_producer.close()
logger.info(f"Stopped connection to stream: {self.stream_id}")
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index e43d15cc4..a97c39186 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -18,7 +18,13 @@
from enum import Enum
from typing import Dict, Optional
-from streampipes.model.common import EventProperty, EventSchema
+from streampipes.functions.broker import SupportedBroker
+from streampipes.model.common import (
+ EventGrounding,
+ EventProperty,
+ EventSchema,
+ TransportProtocol,
+)
from streampipes.model.resource.data_stream import DataStream
@@ -44,7 +50,12 @@ class RuntimeType(Enum):
# TODO Use an more general approach to create a data stream
-def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optional[str] = None):
+def create_data_stream(
+ name: str,
+ attributes: Dict[str, str],
+ stream_id: Optional[str] = None,
+ broker: SupportedBroker = SupportedBroker.NATS,
+):
"""Creates a data stream
Parameters
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
]
)
- if not stream_id:
- return DataStream(name=name, event_schema=event_schema)
- return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+ transport_protocols = [TransportProtocol()]
+ if broker == SupportedBroker.KAFKA:
+ transport_protocols = [
+ TransportProtocol(
+ class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol", # type: ignore
+ broker_hostname="kafka",
+ kafkaPort=9092,
+ )
+ ]
+
+ data_stream = DataStream(
+ name=name, event_schema=event_schema, event_grounding=EventGrounding(transport_protocols=transport_protocols)
+ )
+ if stream_id:
+ data_stream.element_id = stream_id
+ return data_stream