You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/02/28 15:01:19 UTC

[streampipes] branch improve-python-client updated: Support Kafka as messaging protocol in Python client (#1362)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch improve-python-client
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/improve-python-client by this push:
     new f2fbac713 Support Kafka as messaging protocol in Python client (#1362)
f2fbac713 is described below

commit f2fbac7136af023bd07e27108926a546d980e755
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Feb 28 16:00:51 2023 +0100

    Support Kafka as messaging protocol in Python client (#1362)
---
 streampipes-client-python/setup.py                 |  1 +
 .../streampipes/functions/broker/__init__.py       |  2 +
 .../streampipes/functions/broker/broker_handler.py |  5 +-
 .../streampipes/functions/broker/kafka_broker.py   | 98 ++++++++++++++++++++++
 .../functions/broker/kafka_message_fetcher.py      | 33 ++++++++
 5 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index e135fd33f..5a5d1f8af 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -35,6 +35,7 @@ base_packages = [
     "pydantic>=1.10.2",
     "requests>=2.28.1",
     "nats-py>=2.2.0",
+    "confluent-kafka>=2.0.2"
 ]
 
 dev_packages = base_packages + [
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 45ffbdc7c..a7bdb410e 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -16,11 +16,13 @@
 #
 from .broker import Broker
 from .nats_broker import NatsBroker
+from .kafka_broker import KafkaBroker
 
 from .broker_handler import SupportedBroker, get_broker  # isort: skip
 
 __all__ = [
     "Broker",
+    "KafkaBroker",
     "NatsBroker",
     "SupportedBroker",
     "get_broker",
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 2c05b839d..3aac0f986 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,7 @@
 #
 from enum import Enum
 
-from streampipes.functions.broker import Broker, NatsBroker
+from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
 from streampipes.model.resource.data_stream import DataStream
 
 
@@ -24,6 +24,7 @@ class SupportedBroker(Enum):
     """Enum for the supported brokers."""
 
     NATS = "NatsTransportProtocol"
+    KAFKA = "KafkaTransportProtocol"
 
 
 # TODO Exception should be removed once all brokers are implemented.
@@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for mo
     broker_name = data_stream.event_grounding.transport_protocols[0].class_name
     if SupportedBroker.NATS.value in broker_name:
         return NatsBroker()
+    elif SupportedBroker.KAFKA.value in broker_name:
+        return KafkaBroker()
     else:
         raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
new file mode 100644
index 000000000..5b5f6ae94
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from typing import Any, AsyncIterator, Dict
+
+from confluent_kafka import Consumer
+from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from streampipes.model.common import random_letters
+
+logger = logging.getLogger(__name__)
+
+
+class KafkaBroker(Broker):
+    """Implementation of the NatsBroker"""
+
+    async def _makeConnection(self, hostname: str, port: int) -> None:
+        """Helper function to connect to a server.
+
+        Parameters
+        ----------
+
+        hostname: str
+            The hostname of the of the server, which the broker connects to.
+
+        port: int
+            The port number of the connection.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer = Consumer({
+            'bootstrap.servers': f"{hostname}:{port}",
+            'group.id': random_letters(6),
+            'auto.offset.reset': 'latest'
+        })
+
+    async def createSubscription(self) -> None:
+        """Creates a subscription to a data stream.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer.subscribe([self.topic_name])
+
+        logger.info(f"Subscribed to stream: {self.stream_id}")
+
+    async def publish_event(self, event: Dict[str, Any]):
+        """Publish an event to a connected data stream.
+
+        Parameters
+        ----------
+         event: Dict[str, Any]
+            The event to be published.
+
+        Returns
+        -------
+        None
+        """
+
+        #await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+
+    async def disconnect(self) -> None:
+        """Closes the connection to the server.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer.close()
+        logger.info(f"Stopped connection to stream: {self.stream_id}")
+
+    def get_message(self) -> AsyncIterator:
+        """Get the published messages of the subscription.
+
+        Returns
+        -------
+        An async iterator for the messages.
+        """
+
+        return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
new file mode 100644
index 000000000..8ca6b3142
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
@@ -0,0 +1,33 @@
+from confluent_kafka import Consumer
+
+
+class KafkaMsg:
+    """An internal representation of a Kafka message
+
+        Parameters
+        ----------
+        data: Byte Array
+            The received Kafka message as byte array
+    """
+    def __init__(self, data):
+        self.data = data
+
+
+class KafkaMessageFetcher:
+    """Fetches the next message from Kafka
+
+        Parameters
+        ----------
+        consumer: Consumer
+            The Kafka consumer
+    """
+    def __init__(self, consumer: Consumer):
+        self.consumer = consumer
+
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self):
+        msg = self.consumer.poll(0.1)
+        return KafkaMsg(msg.value())
+