You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/02/28 15:01:19 UTC
[streampipes] branch improve-python-client updated: Support Kafka as messaging protocol in Python client (#1362)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch improve-python-client
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/improve-python-client by this push:
new f2fbac713 Support Kafka as messaging protocol in Python client (#1362)
f2fbac713 is described below
commit f2fbac7136af023bd07e27108926a546d980e755
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Feb 28 16:00:51 2023 +0100
Support Kafka as messaging protocol in Python client (#1362)
---
streampipes-client-python/setup.py | 1 +
.../streampipes/functions/broker/__init__.py | 2 +
.../streampipes/functions/broker/broker_handler.py | 5 +-
.../streampipes/functions/broker/kafka_broker.py | 98 ++++++++++++++++++++++
.../functions/broker/kafka_message_fetcher.py | 33 ++++++++
5 files changed, 138 insertions(+), 1 deletion(-)
diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index e135fd33f..5a5d1f8af 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -35,6 +35,7 @@ base_packages = [
"pydantic>=1.10.2",
"requests>=2.28.1",
"nats-py>=2.2.0",
+ "confluent-kafka>=2.0.2"
]
dev_packages = base_packages + [
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 45ffbdc7c..a7bdb410e 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -16,11 +16,13 @@
#
from .broker import Broker
from .nats_broker import NatsBroker
+from .kafka_broker import KafkaBroker
from .broker_handler import SupportedBroker, get_broker # isort: skip
__all__ = [
"Broker",
+ "KafkaBroker",
"NatsBroker",
"SupportedBroker",
"get_broker",
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 2c05b839d..3aac0f986 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,7 @@
#
from enum import Enum
-from streampipes.functions.broker import Broker, NatsBroker
+from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
from streampipes.model.resource.data_stream import DataStream
@@ -24,6 +24,7 @@ class SupportedBroker(Enum):
"""Enum for the supported brokers."""
NATS = "NatsTransportProtocol"
+ KAFKA = "KafkaTransportProtocol"
# TODO Exception should be removed once all brokers are implemented.
@@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
return NatsBroker()
+ elif SupportedBroker.KAFKA.value in broker_name:
+ return KafkaBroker()
else:
raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
new file mode 100644
index 000000000..5b5f6ae94
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from typing import Any, AsyncIterator, Dict
+
+from confluent_kafka import Consumer
+from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from streampipes.model.common import random_letters
+
+logger = logging.getLogger(__name__)
+
+
+class KafkaBroker(Broker):
+ """Implementation of the NatsBroker"""
+
+ async def _makeConnection(self, hostname: str, port: int) -> None:
+ """Helper function to connect to a server.
+
+ Parameters
+ ----------
+
+ hostname: str
+ The hostname of the of the server, which the broker connects to.
+
+ port: int
+ The port number of the connection.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer = Consumer({
+ 'bootstrap.servers': f"{hostname}:{port}",
+ 'group.id': random_letters(6),
+ 'auto.offset.reset': 'latest'
+ })
+
+ async def createSubscription(self) -> None:
+ """Creates a subscription to a data stream.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer.subscribe([self.topic_name])
+
+ logger.info(f"Subscribed to stream: {self.stream_id}")
+
+ async def publish_event(self, event: Dict[str, Any]):
+ """Publish an event to a connected data stream.
+
+ Parameters
+ ----------
+ event: Dict[str, Any]
+ The event to be published.
+
+ Returns
+ -------
+ None
+ """
+
+ #await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+
+ async def disconnect(self) -> None:
+ """Closes the connection to the server.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer.close()
+ logger.info(f"Stopped connection to stream: {self.stream_id}")
+
+ def get_message(self) -> AsyncIterator:
+ """Get the published messages of the subscription.
+
+ Returns
+ -------
+ An async iterator for the messages.
+ """
+
+ return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
new file mode 100644
index 000000000..8ca6b3142
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
@@ -0,0 +1,33 @@
+from confluent_kafka import Consumer
+
+
+class KafkaMsg:
+ """An internal representation of a Kafka message
+
+ Parameters
+ ----------
+ data: Byte Array
+ The received Kafka message as byte array
+ """
+ def __init__(self, data):
+ self.data = data
+
+
+class KafkaMessageFetcher:
+ """Fetches the next message from Kafka
+
+ Parameters
+ ----------
+ consumer: Consumer
+ The Kafka consumer
+ """
+ def __init__(self, consumer: Consumer):
+ self.consumer = consumer
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ msg = self.consumer.poll(0.1)
+ return KafkaMsg(msg.value())
+