You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/03/11 12:59:20 UTC

[streampipes] branch dev updated: Add support for the Kafka broker in Python (#1363)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7fe524e47 Add support for the Kafka broker in Python (#1363)
7fe524e47 is described below

commit 7fe524e475e404a9289b2ca39e6d31c6c71aa67b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sat Mar 11 13:59:14 2023 +0100

    Add support for the Kafka broker in Python (#1363)
    
    * Update data model
    
    * Support Kafka as messaging protocol in Python client (#1362)
    
    * Reformat code (#1362)
    
    * Update broker __init__
    
    ---------
    
    Co-authored-by: Sven Oehler <oe...@web.de>
    Co-authored-by: Sven Oehler <43...@users.noreply.github.com>
---
 streampipes-client-python/setup.py                 |  1 +
 .../streampipes/functions/broker/__init__.py       |  2 +
 .../streampipes/functions/broker/broker_handler.py |  5 +-
 .../streampipes/functions/broker/kafka_broker.py   | 96 ++++++++++++++++++++++
 .../{__init__.py => kafka_message_fetcher.py}      | 41 +++++++--
 .../streampipes/model/common.py                    |  4 +-
 .../model/resource/data_lake_measure.py            |  2 +-
 7 files changed, 138 insertions(+), 13 deletions(-)

diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index 4cf4b978d..2675520d0 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -35,6 +35,7 @@ base_packages = [
     "pydantic>=1.10.2",
     "requests>=2.28.1",
     "nats-py>=2.2.0",
+    "confluent-kafka>=2.0.2"
 ]
 
 dev_packages = base_packages + [
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 45ffbdc7c..32d35fba3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -15,12 +15,14 @@
 # limitations under the License.
 #
 from .broker import Broker
+from .kafka_broker import KafkaBroker
 from .nats_broker import NatsBroker
 
 from .broker_handler import SupportedBroker, get_broker  # isort: skip
 
 __all__ = [
     "Broker",
+    "KafkaBroker",
     "NatsBroker",
     "SupportedBroker",
     "get_broker",
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 2c05b839d..3aac0f986 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,7 @@
 #
 from enum import Enum
 
-from streampipes.functions.broker import Broker, NatsBroker
+from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
 from streampipes.model.resource.data_stream import DataStream
 
 
@@ -24,6 +24,7 @@ class SupportedBroker(Enum):
     """Enum for the supported brokers."""
 
     NATS = "NatsTransportProtocol"
+    KAFKA = "KafkaTransportProtocol"
 
 
 # TODO Exception should be removed once all brokers are implemented.
@@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for mo
     broker_name = data_stream.event_grounding.transport_protocols[0].class_name
     if SupportedBroker.NATS.value in broker_name:
         return NatsBroker()
+    elif SupportedBroker.KAFKA.value in broker_name:
+        return KafkaBroker()
     else:
         raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
new file mode 100644
index 000000000..900d7f6c1
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from typing import Any, AsyncIterator, Dict
+
+from confluent_kafka import Consumer  # type: ignore
+from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from streampipes.model.common import random_letters
+
+logger = logging.getLogger(__name__)
+
+
+class KafkaBroker(Broker):
+    """Implementation of the NatsBroker"""
+
+    async def _makeConnection(self, hostname: str, port: int) -> None:
+        """Helper function to connect to a server.
+
+        Parameters
+        ----------
+
+        hostname: str
+            The hostname of the of the server, which the broker connects to.
+
+        port: int
+            The port number of the connection.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer = Consumer(
+            {"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
+        )
+
+    async def createSubscription(self) -> None:
+        """Creates a subscription to a data stream.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer.subscribe([self.topic_name])
+
+        logger.info(f"Subscribed to stream: {self.stream_id}")
+
+    async def publish_event(self, event: Dict[str, Any]):
+        """Publish an event to a connected data stream.
+
+        Parameters
+        ----------
+         event: Dict[str, Any]
+            The event to be published.
+
+        Returns
+        -------
+        None
+        """
+
+        # await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+
+    async def disconnect(self) -> None:
+        """Closes the connection to the server.
+
+        Returns
+        -------
+        None
+        """
+        self.kafka_consumer.close()
+        logger.info(f"Stopped connection to stream: {self.stream_id}")
+
+    def get_message(self) -> AsyncIterator:
+        """Get the published messages of the subscription.
+
+        Returns
+        -------
+        An async iterator for the messages.
+        """
+
+        return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
similarity index 53%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
index 45ffbdc7c..56bbf46c9 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
@@ -14,14 +14,37 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .broker import Broker
-from .nats_broker import NatsBroker
+from confluent_kafka import Consumer  # type: ignore
 
-from .broker_handler import SupportedBroker, get_broker  # isort: skip
 
-__all__ = [
-    "Broker",
-    "NatsBroker",
-    "SupportedBroker",
-    "get_broker",
-]
+class KafkaMsg:
+    """An internal representation of a Kafka message
+
+    Parameters
+    ----------
+    data: Byte Array
+        The received Kafka message as byte array
+    """
+
+    def __init__(self, data):
+        self.data = data
+
+
+class KafkaMessageFetcher:
+    """Fetches the next message from Kafka
+
+    Parameters
+    ----------
+    consumer: Consumer
+        The Kafka consumer
+    """
+
+    def __init__(self, consumer: Consumer):
+        self.consumer = consumer
+
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self):
+        msg = self.consumer.poll(0.1)
+        return KafkaMsg(msg.value())
diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py
index edc24b955..a6c21aa84 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -93,8 +93,8 @@ class EventProperty(BasicModel):
     description: Optional[StrictStr]
     runtime_name: StrictStr
     required: StrictBool = Field(default=False)
-    domain_properties: List[StrictStr] = Field(default_factory=list)
-    property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
+    domain_properties: Optional[List[StrictStr]] = Field(default_factory=list)
+    property_scope: Optional[StrictStr] = Field(default="MEASUREMENT_PROPERTY")
     index: StrictInt = Field(default=0)
     runtime_id: Optional[StrictStr]
     runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
index 1dcbe3a4a..d72732afa 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
@@ -52,4 +52,4 @@ class DataLakeMeasure(Resource):
     pipeline_id: Optional[StrictStr]
     pipeline_name: Optional[StrictStr]
     pipeline_is_running: StrictBool
-    schema_version: StrictStr
+    schema_version: Optional[StrictStr]