You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 16:56:41 UTC

[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Enable output streams for Kafka

This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
     new f4d2f35da Enable output streams for Kafka
f4d2f35da is described below

commit f4d2f35da684cdedc1630ad43bf7671f6150a5ca
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 18:56:45 2023 +0200

    Enable output streams for Kafka
---
 ...ine-learning-on-a-streampipes-data-stream.ipynb | 18 +++++++-----
 .../streampipes/function_zoo/river_function.py     |  8 ++++-
 .../streampipes/functions/broker/broker_handler.py | 27 ++++++++++++++++-
 .../functions/broker/kafka/kafka_publisher.py      |  1 -
 .../functions/utils/data_stream_generator.py       | 34 ++++++++++++++++++----
 5 files changed, 73 insertions(+), 15 deletions(-)

diff --git a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
index 8e6673ab4..780464d3d 100644
--- a/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb
@@ -23,14 +23,14 @@
   {
    "cell_type": "code",
    "execution_count": null,
+   "metadata": {
+    "collapsed": false
+   },
    "outputs": [],
    "source": [
     "# you can install all required dependecies for this tutorial by executing the following command\n",
     "%pip install river streampipes"
-   ],
-   "metadata": {
-    "collapsed": false
-   }
+   ]
   },
   {
    "cell_type": "code",
@@ -42,7 +42,10 @@
     "\n",
     "os.environ[\"USER\"] = \"admin@streampipes.apache.org\"\n",
     "os.environ[\"API-KEY\"] = \"XXX\"\n",
-    "os.environ[\"BROKER-HOST\"] = \"localhost\""
+    "\n",
+    "# Use this if you work locally:\n",
+    "os.environ[\"BROKER-HOST\"] = \"localhost\"  \n",
+    "os.environ[\"KAFKA-PORT\"] = \"9094\" # When using Kafka as message broker"
    ]
   },
   {
@@ -324,7 +327,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from river import cluster, compose, preprocessing, tree\n",
+    "import pickle\n",
+    "from river import compose, tree\n",
     "from streampipes.function_zoo.river_function import OnlineML\n",
     "from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
     "\n",
@@ -393,7 +397,7 @@
    "outputs": [],
    "source": [
     "import pickle\n",
-    "from river import cluster, compose, preprocessing, tree\n",
+    "from river import compose, tree\n",
     "from streampipes.function_zoo.river_function import OnlineML\n",
     "from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
     "\n",
diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index b43b40d0c..287f2715b 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -17,6 +17,7 @@
 from typing import Any, Callable, Dict, List, Optional
 
 from streampipes.client.client import StreamPipesClient
+from streampipes.functions.broker.broker_handler import get_broker_enum
 from streampipes.functions.function_handler import FunctionHandler
 from streampipes.functions.registration import Registration
 from streampipes.functions.streampipes_function import StreamPipesFunction
@@ -187,7 +188,12 @@ class OnlineML:
             attributes["truth"] = prediction_type
             if target_label is None:
                 raise ValueError("You must define a target attribute for a supervised model.")
-        output_stream = create_data_stream("prediction", attributes)
+
+        output_stream = create_data_stream(
+            name="prediction",
+            attributes=attributes,
+            broker=get_broker_enum(client.dataStreamApi.get(stream_ids[0])),  # type: ignore
+        )
         function_definition = FunctionDefinition().add_output_data_stream(output_stream)
         self.sp_function = RiverFunction(
             function_definition, stream_ids, model, supervised, target_label, on_start, on_event, on_stop
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 7cfe78f7c..1cdfc2dee 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -68,7 +68,32 @@ def get_broker(
         return NatsConsumer()
     elif SupportedBroker.KAFKA.value in broker_name:
         if is_publisher:
-            KafkaPublisher()
+            return KafkaPublisher()
         return KafkaConsumer()
     else:
         raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
+
+
+def get_broker_enum(data_stream: DataStream) -> SupportedBroker:
+    """Derive the enum of the broker for the given data stream.
+
+    Parameters
+    ----------
+    data_stream: DataStream
+        Data stream instance from which the broker is inferred
+
+    Returns
+    -------
+    broker: SupportedBroker
+        The corresponding broker enum derived from data stream.
+
+    Raises
+    ------
+    UnsupportedBrokerError
+        Is raised when the given data stream belongs to a broker that is currently not supported by StreamPipes Python.
+    """
+    broker_name = data_stream.event_grounding.transport_protocols[0].class_name
+    for b in SupportedBroker:
+        if b.value in broker_name:
+            return b
+    raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 117fa2cc6..f55337b5a 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -69,5 +69,4 @@ class KafkaPublisher(Publisher):
         -------
         None
         """
-        self.kafka_producer.close()
         logger.info(f"Stopped connection to stream: {self.stream_id}")
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index e43d15cc4..a97c39186 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -18,7 +18,13 @@
 from enum import Enum
 from typing import Dict, Optional
 
-from streampipes.model.common import EventProperty, EventSchema
+from streampipes.functions.broker import SupportedBroker
+from streampipes.model.common import (
+    EventGrounding,
+    EventProperty,
+    EventSchema,
+    TransportProtocol,
+)
 from streampipes.model.resource.data_stream import DataStream
 
 
@@ -44,7 +50,12 @@ class RuntimeType(Enum):
 
 
 # TODO Use an more general approach to create a data stream
-def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optional[str] = None):
+def create_data_stream(
+    name: str,
+    attributes: Dict[str, str],
+    stream_id: Optional[str] = None,
+    broker: SupportedBroker = SupportedBroker.NATS,
+):
     """Creates a data stream
 
     Parameters
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
         ]
     )
 
-    if not stream_id:
-        return DataStream(name=name, event_schema=event_schema)
-    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+    transport_protocols = [TransportProtocol()]
+    if broker == SupportedBroker.KAFKA:
+        transport_protocols = [
+            TransportProtocol(
+                class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol",  # type: ignore
+                broker_hostname="kafka",
+                kafkaPort=9092,
+            )
+        ]
+
+    data_stream = DataStream(
+        name=name, event_schema=event_schema, event_grounding=EventGrounding(transport_protocols=transport_protocols)
+    )
+    if stream_id:
+        data_stream.element_id = stream_id
+    return data_stream