You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/04 15:10:05 UTC

[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Add consumer and publisher

This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
     new 2c7aee594 Add consumer and publisher
2c7aee594 is described below

commit 2c7aee59482e0f308859b979da003c58747e60cd
Author: Sven Oehler <oe...@web.de>
AuthorDate: Tue Apr 4 17:09:42 2023 +0200

    Add consumer and publisher
---
 .../streampipes/functions/broker/__init__.py       | 19 +++++--
 .../streampipes/functions/broker/broker.py         | 45 ++-------------
 .../streampipes/functions/broker/broker_handler.py | 20 +++++--
 .../streampipes/functions/broker/consumer.py       | 65 ++++++++++++++++++++++
 .../functions/broker/{ => kafka}/__init__.py       | 13 -----
 .../{kafka_broker.py => kafka/kafka_consumer.py}   | 36 ++++--------
 .../broker/{ => kafka}/kafka_message_fetcher.py    |  0
 .../{nats_broker.py => kafka/kafka_publisher.py}   | 46 ++++-----------
 .../functions/broker/{ => nats}/__init__.py        | 13 -----
 .../{nats_broker.py => nats/nats_consumer.py}      | 31 +++--------
 .../{nats_broker.py => nats/nats_publisher.py}     | 34 ++---------
 .../functions/broker/output_collector.py           | 14 ++---
 .../functions/broker/{__init__.py => publisher.py} | 35 ++++++++----
 .../streampipes/functions/function_handler.py      |  5 +-
 .../functions/utils/data_stream_context.py         |  8 +--
 .../tests/functions/test_function_handler.py       | 28 +++++-----
 .../tests/functions/test_river_function.py         | 24 ++++----
 17 files changed, 201 insertions(+), 235 deletions(-)

diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 32d35fba3..687b55b36 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -15,15 +15,26 @@
 # limitations under the License.
 #
 from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from .consumer import Consumer
+from .publisher import Publisher
+
+# isort: split
+
+from .kafka.kafka_consumer import KafkaConsumer
+from .kafka.kafka_publisher import KafkaPublisher
+from .nats.nats_consumer import NatsConsumer
+from .nats.nats_publisher import NatsPublisher
 
 from .broker_handler import SupportedBroker, get_broker  # isort: skip
 
 __all__ = [
     "Broker",
-    "KafkaBroker",
-    "NatsBroker",
+    "Consumer",
+    "Publisher",
     "SupportedBroker",
     "get_broker",
+    "KafkaConsumer",
+    "KafkaPublisher",
+    "NatsConsumer",
+    "NatsPublisher",
 ]
diff --git a/streampipes-client-python/streampipes/functions/broker/broker.py b/streampipes-client-python/streampipes/functions/broker/broker.py
index 7697bda11..53b1c075c 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker.py
@@ -16,15 +16,14 @@
 #
 import os
 from abc import ABC, abstractmethod
-from typing import Any, AsyncIterator, Dict
 
 from streampipes.model.resource.data_stream import DataStream
 
 
 class Broker(ABC):
-    """Abstract implementation of a broker.
+    """Abstract implementation of a broker for consumer and publisher.
 
-    A broker allows both to subscribe to a data stream and to publish events to a data stream.
+    It contains the basic logic to connect to a data stream.
     """
 
     async def connect(self, data_stream: DataStream) -> None:
@@ -45,10 +44,10 @@ class Broker(ABC):
         hostname = transport_protocol.broker_hostname
         if "BROKER-HOST" in os.environ.keys():
             hostname = os.environ["BROKER-HOST"]
-        await self._makeConnection(hostname, transport_protocol.port)
+        await self._make_connection(hostname, transport_protocol.port)
 
     @abstractmethod
-    async def _makeConnection(self, hostname: str, port: int) -> None:
+    async def _make_connection(self, hostname: str, port: int) -> None:
         """Helper function to connect to a server.
 
         Parameters
@@ -64,31 +63,6 @@ class Broker(ABC):
         """
         raise NotImplementedError  # pragma: no cover
 
-    @abstractmethod
-    async def createSubscription(self) -> None:
-        """Creates a subscription to a data stream.
-
-        Returns
-        -------
-        None
-        """
-        raise NotImplementedError  # pragma: no cover
-
-    @abstractmethod
-    async def publish_event(self, event: Dict[str, Any]) -> None:
-        """Publish an event to a connected data stream.
-
-        Parameters
-        ----------
-        event: Dict[str, Any]
-            The event to be published.
-
-        Returns
-        -------
-        None
-        """
-        raise NotImplementedError  # pragma: no cover
-
     @abstractmethod
     async def disconnect(self) -> None:
         """Closes the connection to the server.
@@ -98,14 +72,3 @@ class Broker(ABC):
         None
         """
         raise NotImplementedError  # pragma: no cover
-
-    @abstractmethod
-    def get_message(self) -> AsyncIterator:
-        """Get the published messages of the subscription.
-
-        Returns
-        -------
-        iterator: AsyncIterator
-            An async iterator for the messages.
-        """
-        raise NotImplementedError  # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 6bf070486..7cfe78f7c 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,13 @@
 #
 from enum import Enum
 
-from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
+from streampipes.functions.broker import (
+    Broker,
+    KafkaConsumer,
+    KafkaPublisher,
+    NatsConsumer,
+    NatsPublisher,
+)
 from streampipes.model.resource.data_stream import DataStream
 
 
@@ -35,7 +41,9 @@ class UnsupportedBrokerError(Exception):
         super().__init__(message)
 
 
-def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for more transport_protocols
+def get_broker(
+    data_stream: DataStream, is_publisher: bool = False
+) -> Broker:  # TODO implementation for more transport_protocols
     """Derive the broker for the given data stream.
 
     Parameters
@@ -55,8 +63,12 @@ def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for mo
     """
     broker_name = data_stream.event_grounding.transport_protocols[0].class_name
     if SupportedBroker.NATS.value in broker_name:
-        return NatsBroker()
+        if is_publisher:
+            return NatsPublisher()
+        return NatsConsumer()
     elif SupportedBroker.KAFKA.value in broker_name:
-        return KafkaBroker()
+        if is_publisher:
+            KafkaPublisher()
+        return KafkaConsumer()
     else:
         raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/consumer.py b/streampipes-client-python/streampipes/functions/broker/consumer.py
new file mode 100644
index 000000000..03e988504
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/consumer.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import abstractmethod
+from typing import AsyncIterator
+
+from streampipes.functions.broker import Broker
+from streampipes.model.resource.data_stream import DataStream
+
+
+class Consumer(Broker):
+    """Abstract implementation a consumer for a broker.
+
+    A consumer allows to subscribe to a data stream.
+    """
+
+    async def connect(self, data_stream: DataStream) -> None:
+        """Connects to the broker running in StreamPipes and creates a subscription.
+
+        Parameters
+        ----------
+        data_stream: DataStream
+            Contains the meta information (resources) for a data stream.
+
+        Returns
+        -------
+        None
+        """
+        await super().connect(data_stream)
+        await self._create_subscription()
+
+    @abstractmethod
+    async def _create_subscription(self) -> None:
+        """Creates a subscription to a data stream.
+
+        Returns
+        -------
+        None
+
+        """
+        raise NotImplementedError  # pragma: no cover
+
+    @abstractmethod
+    def get_message(self) -> AsyncIterator:
+        """Get the published messages of the subscription.
+
+        Returns
+        -------
+        iterator: AsyncIterator
+            An async iterator for the messages.
+        """
+        raise NotImplementedError  # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
similarity index 73%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
index 32d35fba3..cce3acad3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
@@ -14,16 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
-
-from .broker_handler import SupportedBroker, get_broker  # isort: skip
-
-__all__ = [
-    "Broker",
-    "KafkaBroker",
-    "NatsBroker",
-    "SupportedBroker",
-    "get_broker",
-]
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
similarity index 71%
rename from streampipes-client-python/streampipes/functions/broker/kafka_broker.py
rename to streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
index 9bc1337a2..18bb97776 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
@@ -16,20 +16,20 @@
 #
 
 import logging
-from typing import Any, AsyncIterator, Dict
+from typing import AsyncIterator
 
-from confluent_kafka import Consumer  # type: ignore
-from streampipes.functions.broker.broker import Broker
-from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from confluent_kafka import Consumer as KafkaConnection  # type: ignore
+from streampipes.functions.broker import Consumer
+from streampipes.functions.broker.kafka.kafka_message_fetcher import KafkaMessageFetcher
 from streampipes.model.common import random_letters
 
 logger = logging.getLogger(__name__)
 
 
-class KafkaBroker(Broker):
-    """Implementation of the broker for Kafka"""
+class KafkaConsumer(Consumer):
+    """Implementation of a consumer for Kafka"""
 
-    async def _makeConnection(self, hostname: str, port: int) -> None:
+    async def _make_connection(self, hostname: str, port: int) -> None:
         """Helper function to connect to a server.
 
         Parameters
@@ -45,11 +45,12 @@ class KafkaBroker(Broker):
         -------
         None
         """
-        self.kafka_consumer = Consumer(
+        self.kafka_consumer = KafkaConnection(
             {"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
         )
+        logger.info(f"Connected to Kafka at {hostname}:{port}")
 
-    async def createSubscription(self) -> None:
+    async def _create_subscription(self) -> None:
         """Creates a subscription to a data stream.
 
         Returns
@@ -57,24 +58,8 @@ class KafkaBroker(Broker):
         None
         """
         self.kafka_consumer.subscribe([self.topic_name])
-
         logger.info(f"Subscribed to stream: {self.stream_id}")
 
-    async def publish_event(self, event: Dict[str, Any]):
-        """Publish an event to a connected data stream.
-
-        Parameters
-        ----------
-        event: Dict[str, Any]
-            The event to be published.
-
-        Returns
-        -------
-        None
-        """
-
-        # await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
-
     async def disconnect(self) -> None:
         """Closes the connection to the server.
 
@@ -93,5 +78,4 @@ class KafkaBroker(Broker):
         iterator: AsyncIterator
             An async iterator for the messages.
         """
-
         return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
similarity index 100%
rename from streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
rename to streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
similarity index 57%
copy from streampipes-client-python/streampipes/functions/broker/nats_broker.py
copy to streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 238e14bfd..8e8643e66 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -14,20 +14,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import json
 import logging
-from typing import Any, AsyncIterator, Dict
+from typing import Any, Dict
 
-from nats import connect
-from streampipes.functions.broker.broker import Broker
+from confluent_kafka import Producer  # type: ignore
+from streampipes.functions.broker import Publisher
 
 logger = logging.getLogger(__name__)
 
 
-class NatsBroker(Broker):
-    """Implementation of the NatsBroker"""
+class KafkaPublisher(Publisher):
+    """Implementation of a publisher for Kafka"""
 
-    async def _makeConnection(self, hostname: str, port: int) -> None:
+    async def _make_connection(self, hostname: str, port: int) -> None:
         """Helper function to connect to a server.
 
         Parameters
@@ -42,22 +43,9 @@ class NatsBroker(Broker):
         Returns
         -------
         None
-
         """
-        self.nats_client = await connect(f"nats://{hostname}:{port}")
-        if self.nats_client.connected_url is not None:
-            logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
-
-    async def createSubscription(self) -> None:
-        """Creates a subscription to a data stream.
-
-        Returns
-        -------
-        None
-
-        """
-        self.subscription = await self.nats_client.subscribe(self.topic_name)
-        logger.info(f"Subscribed to stream: {self.stream_id}")
+        self.kafka_producer = Producer({"bootstrap.servers": f"{hostname}:{port}"})
+        logger.info(f"Connected to Kafka at {hostname}:{port}")
 
     async def publish_event(self, event: Dict[str, Any]):
         """Publish an event to a connected data stream.
@@ -70,9 +58,9 @@ class NatsBroker(Broker):
         Returns
         -------
         None
-
         """
-        await self.nats_client.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+        self.kafka_producer.produce(topic=self.topic_name, value=json.dumps(event).encode("utf-8"))
+        self.kafka_producer.flush()
 
     async def disconnect(self) -> None:
         """Closes the connection to the server.
@@ -81,15 +69,5 @@ class NatsBroker(Broker):
         -------
         None
         """
-        await self.nats_client.close()
+        self.kafka_producer.close()
         logger.info(f"Stopped connection to stream: {self.stream_id}")
-
-    def get_message(self) -> AsyncIterator:
-        """Get the published messages of the subscription.
-
-        Returns
-        -------
-        message_iterator: AsyncIterator
-            An async iterator for the messages.
-        """
-        return self.subscription.messages
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/nats/__init__.py
similarity index 73%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/nats/__init__.py
index 32d35fba3..cce3acad3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/__init__.py
@@ -14,16 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
-
-from .broker_handler import SupportedBroker, get_broker  # isort: skip
-
-__all__ = [
-    "Broker",
-    "KafkaBroker",
-    "NatsBroker",
-    "SupportedBroker",
-    "get_broker",
-]
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
similarity index 70%
copy from streampipes-client-python/streampipes/functions/broker/nats_broker.py
copy to streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
index 238e14bfd..18427cb27 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
@@ -14,20 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import json
 import logging
-from typing import Any, AsyncIterator, Dict
+from typing import AsyncIterator
 
 from nats import connect
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Consumer
 
 logger = logging.getLogger(__name__)
 
 
-class NatsBroker(Broker):
-    """Implementation of the NatsBroker"""
+class NatsConsumer(Consumer):
+    """Implementation of a consumer for NATS"""
 
-    async def _makeConnection(self, hostname: str, port: int) -> None:
+    async def _make_connection(self, hostname: str, port: int) -> None:
         """Helper function to connect to a server.
 
         Parameters
@@ -45,10 +44,9 @@ class NatsBroker(Broker):
 
         """
         self.nats_client = await connect(f"nats://{hostname}:{port}")
-        if self.nats_client.connected_url is not None:
-            logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
+        logger.info(f"Connected to NATS at {hostname}:{port}")
 
-    async def createSubscription(self) -> None:
+    async def _create_subscription(self) -> None:
         """Creates a subscription to a data stream.
 
         Returns
@@ -59,21 +57,6 @@ class NatsBroker(Broker):
         self.subscription = await self.nats_client.subscribe(self.topic_name)
         logger.info(f"Subscribed to stream: {self.stream_id}")
 
-    async def publish_event(self, event: Dict[str, Any]):
-        """Publish an event to a connected data stream.
-
-        Parameters
-        ----------
-        event: Dict[str, Any]
-            The event to be published.
-
-        Returns
-        -------
-        None
-
-        """
-        await self.nats_client.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
-
     async def disconnect(self) -> None:
         """Closes the connection to the server.
 
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
similarity index 66%
rename from streampipes-client-python/streampipes/functions/broker/nats_broker.py
rename to streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
index 238e14bfd..798c1940f 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
@@ -16,18 +16,18 @@
 #
 import json
 import logging
-from typing import Any, AsyncIterator, Dict
+from typing import Any, Dict
 
 from nats import connect
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Publisher
 
 logger = logging.getLogger(__name__)
 
 
-class NatsBroker(Broker):
-    """Implementation of the NatsBroker"""
+class NatsPublisher(Publisher):
+    """Implementation of a publisher for NATS"""
 
-    async def _makeConnection(self, hostname: str, port: int) -> None:
+    async def _make_connection(self, hostname: str, port: int) -> None:
         """Helper function to connect to a server.
 
         Parameters
@@ -45,19 +45,7 @@ class NatsBroker(Broker):
 
         """
         self.nats_client = await connect(f"nats://{hostname}:{port}")
-        if self.nats_client.connected_url is not None:
-            logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
-
-    async def createSubscription(self) -> None:
-        """Creates a subscription to a data stream.
-
-        Returns
-        -------
-        None
-
-        """
-        self.subscription = await self.nats_client.subscribe(self.topic_name)
-        logger.info(f"Subscribed to stream: {self.stream_id}")
+        logger.info(f"Connected to NATS at {hostname}:{port}")
 
     async def publish_event(self, event: Dict[str, Any]):
         """Publish an event to a connected data stream.
@@ -83,13 +71,3 @@ class NatsBroker(Broker):
         """
         await self.nats_client.close()
         logger.info(f"Stopped connection to stream: {self.stream_id}")
-
-    def get_message(self) -> AsyncIterator:
-        """Get the published messages of the subscription.
-
-        Returns
-        -------
-        message_iterator: AsyncIterator
-            An async iterator for the messages.
-        """
-        return self.subscription.messages
diff --git a/streampipes-client-python/streampipes/functions/broker/output_collector.py b/streampipes-client-python/streampipes/functions/broker/output_collector.py
index fcae7d55e..a819539aa 100644
--- a/streampipes-client-python/streampipes/functions/broker/output_collector.py
+++ b/streampipes-client-python/streampipes/functions/broker/output_collector.py
@@ -17,7 +17,7 @@
 import asyncio
 from typing import Any, Coroutine, Dict
 
-from streampipes.functions.broker import get_broker
+from streampipes.functions.broker import Publisher, get_broker
 from streampipes.model.resource.data_stream import DataStream
 
 
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore
+        self._run_coroutine(self.publisher.connect(data_stream))
 
     def collect(self, event: Dict[str, Any]) -> None:
         """Publishes an event to the output stream.
@@ -53,7 +53,7 @@ class OutputCollector:
         -------
         None
         """
-        self._run_coroutine(self.broker.publish_event(event))
+        self._run_coroutine(self.publisher.publish_event(event))
 
     def disconnect(self) -> None:
         """Disconnects the broker of the output collector.
@@ -62,7 +62,7 @@ class OutputCollector:
         -------
         None
         """
-        self._run_coroutine(self.broker.disconnect())
+        self._run_coroutine(self.publisher.disconnect())
 
     @staticmethod
     def _run_coroutine(coroutine: Coroutine) -> None:
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/publisher.py
similarity index 55%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/publisher.py
index 32d35fba3..bdcb0d8e8 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/publisher.py
@@ -14,16 +14,29 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from abc import abstractmethod
+from typing import Any, Dict
 
-from .broker_handler import SupportedBroker, get_broker  # isort: skip
+from streampipes.functions.broker import Broker
 
-__all__ = [
-    "Broker",
-    "KafkaBroker",
-    "NatsBroker",
-    "SupportedBroker",
-    "get_broker",
-]
+
+class Publisher(Broker):
+    """Abstract implementation of a publisher for a broker.
+
+    A publisher allows to publish events to a data stream.
+    """
+
+    @abstractmethod
+    async def publish_event(self, event: Dict[str, Any]) -> None:
+        """Publish an event to a connected data stream.
+
+        Parameters
+        ----------
+        event: Dict[str, Any]
+            The event to be published.
+
+        Returns
+        -------
+        None
+        """
+        raise NotImplementedError  # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/function_handler.py b/streampipes-client-python/streampipes/functions/function_handler.py
index 83b88669b..deb2fc081 100644
--- a/streampipes-client-python/streampipes/functions/function_handler.py
+++ b/streampipes-client-python/streampipes/functions/function_handler.py
@@ -20,7 +20,7 @@ import logging
 from typing import AsyncIterator, Dict, List
 
 from streampipes.client.client import StreamPipesClient
-from streampipes.functions.broker import Broker, get_broker
+from streampipes.functions.broker import Broker, Consumer, get_broker
 from streampipes.functions.registration import Registration
 from streampipes.functions.utils.async_iter_handler import AsyncIterHandler
 from streampipes.functions.utils.data_stream_context import DataStreamContext
@@ -77,7 +77,7 @@ class FunctionHandler:
                 # Get the data stream schema from the API
                 data_stream: DataStream = self.client.dataStreamApi.get(stream_id)  # type: ignore
                 # Get the broker
-                broker = get_broker(data_stream)
+                broker: Consumer = get_broker(data_stream)  # type: ignore
                 # Assign the functions, broker and schema to every stream
                 if stream_id in self.stream_contexts.keys():
                     self.stream_contexts[stream_id].add_function(streampipes_function)
@@ -110,7 +110,6 @@ class FunctionHandler:
             broker = self.stream_contexts[stream_id].broker
             # Connect the broker
             await broker.connect(data_stream)
-            await broker.createSubscription()
             self.brokers.append(broker)
             # Get the messages
             messages[stream_id] = broker.get_message()
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
index 2132b5071..315a13cd0 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
@@ -16,7 +16,7 @@
 #
 from typing import List
 
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Consumer
 from streampipes.functions.streampipes_function import StreamPipesFunction
 from streampipes.model.resource.data_stream import DataStream
 
@@ -30,11 +30,11 @@ class DataStreamContext:
         StreamPipes Functions which require the data of this data stream.
     schema: DataStream
         The schema of this data stream.
-    broker: Broker
-        The broker to connect to this data stream.
+    broker: Consumer
+        The consumer to connect to this data stream.
     """
 
-    def __init__(self, functions: List[StreamPipesFunction], schema: DataStream, broker: Broker) -> None:
+    def __init__(self, functions: List[StreamPipesFunction], schema: DataStream, broker: Consumer) -> None:
         self.functions = functions
         self.schema = schema
         self.broker = broker
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index 6b544f656..8181460ec 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -228,10 +228,10 @@ class TestFunctionHandler(TestCase):
             {"density": 3.6, "temperature": 30.4, "timestamp": 1670000007000},
         ]
 
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
     @patch("streampipes.client.client.Session", autospec=True)
     def test_function_handler(self, http_session: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]):
         http_session_mock = MagicMock()
@@ -263,10 +263,10 @@ class TestFunctionHandler(TestCase):
         self.assertListEqual(test_function.data, self.test_stream_data1)
         self.assertTrue(test_function.stopped)
 
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
     @patch("streampipes.endpoint.endpoint.APIEndpoint.get", autospec=True)
     def test_function_handler_two_streams(self, endpoint: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]):
         def get_stream(endpoint, stream_id):
@@ -318,12 +318,14 @@ class TestFunctionHandler(TestCase):
         self.assertListEqual(test_function2.data2, self.test_stream_data2)
         self.assertTrue(test_function2.stopped)
 
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
     @patch("streampipes.functions.streampipes_function.time", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
     @patch("streampipes.client.client.Session", autospec=True)
     def test_function_output_stream(
         self,
diff --git a/streampipes-client-python/tests/functions/test_river_function.py b/streampipes-client-python/tests/functions/test_river_function.py
index a7035d003..a850501ef 100644
--- a/streampipes-client-python/tests/functions/test_river_function.py
+++ b/streampipes-client-python/tests/functions/test_river_function.py
@@ -67,12 +67,14 @@ class TestRiverFunction(TestCase):
             {"number": 10.6, "bool": False, "timestamp": 1670000005000},
         ]
 
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
     @patch("streampipes.functions.streampipes_function.time", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
     @patch("streampipes.client.client.Session", autospec=True)
     def test_river_function_unsupervised(
         self,
@@ -130,12 +132,14 @@ class TestRiverFunction(TestCase):
 
         self.assertListEqual(model.data_x, self.test_stream_data)
 
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
     @patch("streampipes.functions.streampipes_function.time", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
-    @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+    @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+    @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
     @patch("streampipes.client.client.Session", autospec=True)
     def test_river_function_supervised(
         self,