You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/04 15:10:05 UTC
[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Add consumer and publisher
This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
new 2c7aee594 Add consumer and publisher
2c7aee594 is described below
commit 2c7aee59482e0f308859b979da003c58747e60cd
Author: Sven Oehler <oe...@web.de>
AuthorDate: Tue Apr 4 17:09:42 2023 +0200
Add consumer and publisher
---
.../streampipes/functions/broker/__init__.py | 19 +++++--
.../streampipes/functions/broker/broker.py | 45 ++-------------
.../streampipes/functions/broker/broker_handler.py | 20 +++++--
.../streampipes/functions/broker/consumer.py | 65 ++++++++++++++++++++++
.../functions/broker/{ => kafka}/__init__.py | 13 -----
.../{kafka_broker.py => kafka/kafka_consumer.py} | 36 ++++--------
.../broker/{ => kafka}/kafka_message_fetcher.py | 0
.../{nats_broker.py => kafka/kafka_publisher.py} | 46 ++++-----------
.../functions/broker/{ => nats}/__init__.py | 13 -----
.../{nats_broker.py => nats/nats_consumer.py} | 31 +++--------
.../{nats_broker.py => nats/nats_publisher.py} | 34 ++---------
.../functions/broker/output_collector.py | 14 ++---
.../functions/broker/{__init__.py => publisher.py} | 35 ++++++++----
.../streampipes/functions/function_handler.py | 5 +-
.../functions/utils/data_stream_context.py | 8 +--
.../tests/functions/test_function_handler.py | 28 +++++-----
.../tests/functions/test_river_function.py | 24 ++++----
17 files changed, 201 insertions(+), 235 deletions(-)
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 32d35fba3..687b55b36 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -15,15 +15,26 @@
# limitations under the License.
#
from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from .consumer import Consumer
+from .publisher import Publisher
+
+# isort: split
+
+from .kafka.kafka_consumer import KafkaConsumer
+from .kafka.kafka_publisher import KafkaPublisher
+from .nats.nats_consumer import NatsConsumer
+from .nats.nats_publisher import NatsPublisher
from .broker_handler import SupportedBroker, get_broker # isort: skip
__all__ = [
"Broker",
- "KafkaBroker",
- "NatsBroker",
+ "Consumer",
+ "Publisher",
"SupportedBroker",
"get_broker",
+ "KafkaConsumer",
+ "KafkaPublisher",
+ "NatsConsumer",
+ "NatsPublisher",
]
diff --git a/streampipes-client-python/streampipes/functions/broker/broker.py b/streampipes-client-python/streampipes/functions/broker/broker.py
index 7697bda11..53b1c075c 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker.py
@@ -16,15 +16,14 @@
#
import os
from abc import ABC, abstractmethod
-from typing import Any, AsyncIterator, Dict
from streampipes.model.resource.data_stream import DataStream
class Broker(ABC):
- """Abstract implementation of a broker.
+ """Abstract implementation of a broker for consumer and publisher.
- A broker allows both to subscribe to a data stream and to publish events to a data stream.
+ It contains the basic logic to connect to a data stream.
"""
async def connect(self, data_stream: DataStream) -> None:
@@ -45,10 +44,10 @@ class Broker(ABC):
hostname = transport_protocol.broker_hostname
if "BROKER-HOST" in os.environ.keys():
hostname = os.environ["BROKER-HOST"]
- await self._makeConnection(hostname, transport_protocol.port)
+ await self._make_connection(hostname, transport_protocol.port)
@abstractmethod
- async def _makeConnection(self, hostname: str, port: int) -> None:
+ async def _make_connection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
@@ -64,31 +63,6 @@ class Broker(ABC):
"""
raise NotImplementedError # pragma: no cover
- @abstractmethod
- async def createSubscription(self) -> None:
- """Creates a subscription to a data stream.
-
- Returns
- -------
- None
- """
- raise NotImplementedError # pragma: no cover
-
- @abstractmethod
- async def publish_event(self, event: Dict[str, Any]) -> None:
- """Publish an event to a connected data stream.
-
- Parameters
- ----------
- event: Dict[str, Any]
- The event to be published.
-
- Returns
- -------
- None
- """
- raise NotImplementedError # pragma: no cover
-
@abstractmethod
async def disconnect(self) -> None:
"""Closes the connection to the server.
@@ -98,14 +72,3 @@ class Broker(ABC):
None
"""
raise NotImplementedError # pragma: no cover
-
- @abstractmethod
- def get_message(self) -> AsyncIterator:
- """Get the published messages of the subscription.
-
- Returns
- -------
- iterator: AsyncIterator
- An async iterator for the messages.
- """
- raise NotImplementedError # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 6bf070486..7cfe78f7c 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,13 @@
#
from enum import Enum
-from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
+from streampipes.functions.broker import (
+ Broker,
+ KafkaConsumer,
+ KafkaPublisher,
+ NatsConsumer,
+ NatsPublisher,
+)
from streampipes.model.resource.data_stream import DataStream
@@ -35,7 +41,9 @@ class UnsupportedBrokerError(Exception):
super().__init__(message)
-def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for more transport_protocols
+def get_broker(
+ data_stream: DataStream, is_publisher: bool = False
+) -> Broker: # TODO implementation for more transport_protocols
"""Derive the broker for the given data stream.
Parameters
@@ -55,8 +63,12 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo
"""
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
- return NatsBroker()
+ if is_publisher:
+ return NatsPublisher()
+ return NatsConsumer()
elif SupportedBroker.KAFKA.value in broker_name:
- return KafkaBroker()
+ if is_publisher:
+ KafkaPublisher()
+ return KafkaConsumer()
else:
raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/consumer.py b/streampipes-client-python/streampipes/functions/broker/consumer.py
new file mode 100644
index 000000000..03e988504
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/consumer.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import abstractmethod
+from typing import AsyncIterator
+
+from streampipes.functions.broker import Broker
+from streampipes.model.resource.data_stream import DataStream
+
+
+class Consumer(Broker):
+ """Abstract implementation a consumer for a broker.
+
+ A consumer allows to subscribe to a data stream.
+ """
+
+ async def connect(self, data_stream: DataStream) -> None:
+ """Connects to the broker running in StreamPipes and creates a subscription.
+
+ Parameters
+ ----------
+ data_stream: DataStream
+ Contains the meta information (resources) for a data stream.
+
+ Returns
+ -------
+ None
+ """
+ await super().connect(data_stream)
+ await self._create_subscription()
+
+ @abstractmethod
+ async def _create_subscription(self) -> None:
+ """Creates a subscription to a data stream.
+
+ Returns
+ -------
+ None
+
+ """
+ raise NotImplementedError # pragma: no cover
+
+ @abstractmethod
+ def get_message(self) -> AsyncIterator:
+ """Get the published messages of the subscription.
+
+ Returns
+ -------
+ iterator: AsyncIterator
+ An async iterator for the messages.
+ """
+ raise NotImplementedError # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
similarity index 73%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
index 32d35fba3..cce3acad3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/__init__.py
@@ -14,16 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
-
-from .broker_handler import SupportedBroker, get_broker # isort: skip
-
-__all__ = [
- "Broker",
- "KafkaBroker",
- "NatsBroker",
- "SupportedBroker",
- "get_broker",
-]
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
similarity index 71%
rename from streampipes-client-python/streampipes/functions/broker/kafka_broker.py
rename to streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
index 9bc1337a2..18bb97776 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
@@ -16,20 +16,20 @@
#
import logging
-from typing import Any, AsyncIterator, Dict
+from typing import AsyncIterator
-from confluent_kafka import Consumer # type: ignore
-from streampipes.functions.broker.broker import Broker
-from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from confluent_kafka import Consumer as KafkaConnection # type: ignore
+from streampipes.functions.broker import Consumer
+from streampipes.functions.broker.kafka.kafka_message_fetcher import KafkaMessageFetcher
from streampipes.model.common import random_letters
logger = logging.getLogger(__name__)
-class KafkaBroker(Broker):
- """Implementation of the broker for Kafka"""
+class KafkaConsumer(Consumer):
+ """Implementation of a consumer for Kafka"""
- async def _makeConnection(self, hostname: str, port: int) -> None:
+ async def _make_connection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
@@ -45,11 +45,12 @@ class KafkaBroker(Broker):
-------
None
"""
- self.kafka_consumer = Consumer(
+ self.kafka_consumer = KafkaConnection(
{"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
)
+ logger.info(f"Connected to Kafka at {hostname}:{port}")
- async def createSubscription(self) -> None:
+ async def _create_subscription(self) -> None:
"""Creates a subscription to a data stream.
Returns
@@ -57,24 +58,8 @@ class KafkaBroker(Broker):
None
"""
self.kafka_consumer.subscribe([self.topic_name])
-
logger.info(f"Subscribed to stream: {self.stream_id}")
- async def publish_event(self, event: Dict[str, Any]):
- """Publish an event to a connected data stream.
-
- Parameters
- ----------
- event: Dict[str, Any]
- The event to be published.
-
- Returns
- -------
- None
- """
-
- # await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
-
async def disconnect(self) -> None:
"""Closes the connection to the server.
@@ -93,5 +78,4 @@ class KafkaBroker(Broker):
iterator: AsyncIterator
An async iterator for the messages.
"""
-
return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
similarity index 100%
rename from streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
rename to streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
similarity index 57%
copy from streampipes-client-python/streampipes/functions/broker/nats_broker.py
copy to streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 238e14bfd..8e8643e66 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -14,20 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import json
import logging
-from typing import Any, AsyncIterator, Dict
+from typing import Any, Dict
-from nats import connect
-from streampipes.functions.broker.broker import Broker
+from confluent_kafka import Producer # type: ignore
+from streampipes.functions.broker import Publisher
logger = logging.getLogger(__name__)
-class NatsBroker(Broker):
- """Implementation of the NatsBroker"""
+class KafkaPublisher(Publisher):
+ """Implementation of a publisher for Kafka"""
- async def _makeConnection(self, hostname: str, port: int) -> None:
+ async def _make_connection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
@@ -42,22 +43,9 @@ class NatsBroker(Broker):
Returns
-------
None
-
"""
- self.nats_client = await connect(f"nats://{hostname}:{port}")
- if self.nats_client.connected_url is not None:
- logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
-
- async def createSubscription(self) -> None:
- """Creates a subscription to a data stream.
-
- Returns
- -------
- None
-
- """
- self.subscription = await self.nats_client.subscribe(self.topic_name)
- logger.info(f"Subscribed to stream: {self.stream_id}")
+ self.kafka_producer = Producer({"bootstrap.servers": f"{hostname}:{port}"})
+ logger.info(f"Connected to Kafka at {hostname}:{port}")
async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
@@ -70,9 +58,9 @@ class NatsBroker(Broker):
Returns
-------
None
-
"""
- await self.nats_client.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+ self.kafka_producer.produce(topic=self.topic_name, value=json.dumps(event).encode("utf-8"))
+ self.kafka_producer.flush()
async def disconnect(self) -> None:
"""Closes the connection to the server.
@@ -81,15 +69,5 @@ class NatsBroker(Broker):
-------
None
"""
- await self.nats_client.close()
+ self.kafka_producer.close()
logger.info(f"Stopped connection to stream: {self.stream_id}")
-
- def get_message(self) -> AsyncIterator:
- """Get the published messages of the subscription.
-
- Returns
- -------
- message_iterator: AsyncIterator
- An async iterator for the messages.
- """
- return self.subscription.messages
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/nats/__init__.py
similarity index 73%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/nats/__init__.py
index 32d35fba3..cce3acad3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/__init__.py
@@ -14,16 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
-
-from .broker_handler import SupportedBroker, get_broker # isort: skip
-
-__all__ = [
- "Broker",
- "KafkaBroker",
- "NatsBroker",
- "SupportedBroker",
- "get_broker",
-]
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
similarity index 70%
copy from streampipes-client-python/streampipes/functions/broker/nats_broker.py
copy to streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
index 238e14bfd..18427cb27 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
@@ -14,20 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import json
import logging
-from typing import Any, AsyncIterator, Dict
+from typing import AsyncIterator
from nats import connect
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Consumer
logger = logging.getLogger(__name__)
-class NatsBroker(Broker):
- """Implementation of the NatsBroker"""
+class NatsConsumer(Consumer):
+ """Implementation of a consumer for NATS"""
- async def _makeConnection(self, hostname: str, port: int) -> None:
+ async def _make_connection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
@@ -45,10 +44,9 @@ class NatsBroker(Broker):
"""
self.nats_client = await connect(f"nats://{hostname}:{port}")
- if self.nats_client.connected_url is not None:
- logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
+ logger.info(f"Connected to NATS at {hostname}:{port}")
- async def createSubscription(self) -> None:
+ async def _create_subscription(self) -> None:
"""Creates a subscription to a data stream.
Returns
@@ -59,21 +57,6 @@ class NatsBroker(Broker):
self.subscription = await self.nats_client.subscribe(self.topic_name)
logger.info(f"Subscribed to stream: {self.stream_id}")
- async def publish_event(self, event: Dict[str, Any]):
- """Publish an event to a connected data stream.
-
- Parameters
- ----------
- event: Dict[str, Any]
- The event to be published.
-
- Returns
- -------
- None
-
- """
- await self.nats_client.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
-
async def disconnect(self) -> None:
"""Closes the connection to the server.
diff --git a/streampipes-client-python/streampipes/functions/broker/nats_broker.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
similarity index 66%
rename from streampipes-client-python/streampipes/functions/broker/nats_broker.py
rename to streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
index 238e14bfd..798c1940f 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats_broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
@@ -16,18 +16,18 @@
#
import json
import logging
-from typing import Any, AsyncIterator, Dict
+from typing import Any, Dict
from nats import connect
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Publisher
logger = logging.getLogger(__name__)
-class NatsBroker(Broker):
- """Implementation of the NatsBroker"""
+class NatsPublisher(Publisher):
+ """Implementation of a publisher for NATS"""
- async def _makeConnection(self, hostname: str, port: int) -> None:
+ async def _make_connection(self, hostname: str, port: int) -> None:
"""Helper function to connect to a server.
Parameters
@@ -45,19 +45,7 @@ class NatsBroker(Broker):
"""
self.nats_client = await connect(f"nats://{hostname}:{port}")
- if self.nats_client.connected_url is not None:
- logger.info(f"Connected to NATS at {self.nats_client.connected_url.netloc}")
-
- async def createSubscription(self) -> None:
- """Creates a subscription to a data stream.
-
- Returns
- -------
- None
-
- """
- self.subscription = await self.nats_client.subscribe(self.topic_name)
- logger.info(f"Subscribed to stream: {self.stream_id}")
+ logger.info(f"Connected to NATS at {hostname}:{port}")
async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
@@ -83,13 +71,3 @@ class NatsBroker(Broker):
"""
await self.nats_client.close()
logger.info(f"Stopped connection to stream: {self.stream_id}")
-
- def get_message(self) -> AsyncIterator:
- """Get the published messages of the subscription.
-
- Returns
- -------
- message_iterator: AsyncIterator
- An async iterator for the messages.
- """
- return self.subscription.messages
diff --git a/streampipes-client-python/streampipes/functions/broker/output_collector.py b/streampipes-client-python/streampipes/functions/broker/output_collector.py
index fcae7d55e..a819539aa 100644
--- a/streampipes-client-python/streampipes/functions/broker/output_collector.py
+++ b/streampipes-client-python/streampipes/functions/broker/output_collector.py
@@ -17,7 +17,7 @@
import asyncio
from typing import Any, Coroutine, Dict
-from streampipes.functions.broker import get_broker
+from streampipes.functions.broker import Publisher, get_broker
from streampipes.model.resource.data_stream import DataStream
@@ -32,14 +32,14 @@ class OutputCollector:
Attributes
----------
- broker: Broker
- The broker instance that sends the data to StreamPipes
+ publisher: Publisher
+ The publisher instance that sends the data to StreamPipes
"""
def __init__(self, data_stream: DataStream) -> None:
- self.broker = get_broker(data_stream)
- self._run_coroutine(self.broker.connect(data_stream))
+ self.publisher: Publisher = get_broker(data_stream, is_publisher=True) # type: ignore
+ self._run_coroutine(self.publisher.connect(data_stream))
def collect(self, event: Dict[str, Any]) -> None:
"""Publishes an event to the output stream.
@@ -53,7 +53,7 @@ class OutputCollector:
-------
None
"""
- self._run_coroutine(self.broker.publish_event(event))
+ self._run_coroutine(self.publisher.publish_event(event))
def disconnect(self) -> None:
"""Disconnects the broker of the output collector.
@@ -62,7 +62,7 @@ class OutputCollector:
-------
None
"""
- self._run_coroutine(self.broker.disconnect())
+ self._run_coroutine(self.publisher.disconnect())
@staticmethod
def _run_coroutine(coroutine: Coroutine) -> None:
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/publisher.py
similarity index 55%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/publisher.py
index 32d35fba3..bdcb0d8e8 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/publisher.py
@@ -14,16 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from abc import abstractmethod
+from typing import Any, Dict
-from .broker_handler import SupportedBroker, get_broker # isort: skip
+from streampipes.functions.broker import Broker
-__all__ = [
- "Broker",
- "KafkaBroker",
- "NatsBroker",
- "SupportedBroker",
- "get_broker",
-]
+
+class Publisher(Broker):
+ """Abstract implementation of a publisher for a broker.
+
+ A publisher allows to publish events to a data stream.
+ """
+
+ @abstractmethod
+ async def publish_event(self, event: Dict[str, Any]) -> None:
+ """Publish an event to a connected data stream.
+
+ Parameters
+ ----------
+ event: Dict[str, Any]
+ The event to be published.
+
+ Returns
+ -------
+ None
+ """
+ raise NotImplementedError # pragma: no cover
diff --git a/streampipes-client-python/streampipes/functions/function_handler.py b/streampipes-client-python/streampipes/functions/function_handler.py
index 83b88669b..deb2fc081 100644
--- a/streampipes-client-python/streampipes/functions/function_handler.py
+++ b/streampipes-client-python/streampipes/functions/function_handler.py
@@ -20,7 +20,7 @@ import logging
from typing import AsyncIterator, Dict, List
from streampipes.client.client import StreamPipesClient
-from streampipes.functions.broker import Broker, get_broker
+from streampipes.functions.broker import Broker, Consumer, get_broker
from streampipes.functions.registration import Registration
from streampipes.functions.utils.async_iter_handler import AsyncIterHandler
from streampipes.functions.utils.data_stream_context import DataStreamContext
@@ -77,7 +77,7 @@ class FunctionHandler:
# Get the data stream schema from the API
data_stream: DataStream = self.client.dataStreamApi.get(stream_id) # type: ignore
# Get the broker
- broker = get_broker(data_stream)
+ broker: Consumer = get_broker(data_stream) # type: ignore
# Assign the functions, broker and schema to every stream
if stream_id in self.stream_contexts.keys():
self.stream_contexts[stream_id].add_function(streampipes_function)
@@ -110,7 +110,6 @@ class FunctionHandler:
broker = self.stream_contexts[stream_id].broker
# Connect the broker
await broker.connect(data_stream)
- await broker.createSubscription()
self.brokers.append(broker)
# Get the messages
messages[stream_id] = broker.get_message()
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
index 2132b5071..315a13cd0 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_context.py
@@ -16,7 +16,7 @@
#
from typing import List
-from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker import Consumer
from streampipes.functions.streampipes_function import StreamPipesFunction
from streampipes.model.resource.data_stream import DataStream
@@ -30,11 +30,11 @@ class DataStreamContext:
StreamPipes Functions which require the data of this data stream.
schema: DataStream
The schema of this data stream.
- broker: Broker
- The broker to connect to this data stream.
+ broker: Consumer
+ The consumer to connect to this data stream.
"""
- def __init__(self, functions: List[StreamPipesFunction], schema: DataStream, broker: Broker) -> None:
+ def __init__(self, functions: List[StreamPipesFunction], schema: DataStream, broker: Consumer) -> None:
self.functions = functions
self.schema = schema
self.broker = broker
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index 6b544f656..8181460ec 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -228,10 +228,10 @@ class TestFunctionHandler(TestCase):
{"density": 3.6, "temperature": 30.4, "timestamp": 1670000007000},
]
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
@patch("streampipes.client.client.Session", autospec=True)
def test_function_handler(self, http_session: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]):
http_session_mock = MagicMock()
@@ -263,10 +263,10 @@ class TestFunctionHandler(TestCase):
self.assertListEqual(test_function.data, self.test_stream_data1)
self.assertTrue(test_function.stopped)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
@patch("streampipes.endpoint.endpoint.APIEndpoint.get", autospec=True)
def test_function_handler_two_streams(self, endpoint: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]):
def get_stream(endpoint, stream_id):
@@ -318,12 +318,14 @@ class TestFunctionHandler(TestCase):
self.assertListEqual(test_function2.data2, self.test_stream_data2)
self.assertTrue(test_function2.stopped)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
@patch("streampipes.functions.streampipes_function.time", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
@patch("streampipes.client.client.Session", autospec=True)
def test_function_output_stream(
self,
diff --git a/streampipes-client-python/tests/functions/test_river_function.py b/streampipes-client-python/tests/functions/test_river_function.py
index a7035d003..a850501ef 100644
--- a/streampipes-client-python/tests/functions/test_river_function.py
+++ b/streampipes-client-python/tests/functions/test_river_function.py
@@ -67,12 +67,14 @@ class TestRiverFunction(TestCase):
{"number": 10.6, "bool": False, "timestamp": 1670000005000},
]
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
@patch("streampipes.functions.streampipes_function.time", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
@patch("streampipes.client.client.Session", autospec=True)
def test_river_function_unsupervised(
self,
@@ -130,12 +132,14 @@ class TestRiverFunction(TestCase):
self.assertListEqual(model.data_x, self.test_stream_data)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.disconnect", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.createSubscription", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker._makeConnection", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.disconnect", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._make_connection", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer._create_subscription", autospec=True)
@patch("streampipes.functions.streampipes_function.time", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.get_message", autospec=True)
- @patch("streampipes.functions.broker.nats_broker.NatsBroker.publish_event", autospec=True)
+ @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True)
+ @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True)
@patch("streampipes.client.client.Session", autospec=True)
def test_river_function_supervised(
self,