You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/03/11 12:59:20 UTC
[streampipes] branch dev updated: Add support for the Kafka broker in Python (#1363)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7fe524e47 Add support for the Kafka broker in Python (#1363)
7fe524e47 is described below
commit 7fe524e475e404a9289b2ca39e6d31c6c71aa67b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sat Mar 11 13:59:14 2023 +0100
Add support for the Kafka broker in Python (#1363)
* Update data model
* Support Kafka as messaging protocol in Python client (#1362)
* Reformat code (#1362)
* Update broker __init__
---------
Co-authored-by: Sven Oehler <oe...@web.de>
Co-authored-by: Sven Oehler <43...@users.noreply.github.com>
---
streampipes-client-python/setup.py | 1 +
.../streampipes/functions/broker/__init__.py | 2 +
.../streampipes/functions/broker/broker_handler.py | 5 +-
.../streampipes/functions/broker/kafka_broker.py | 96 ++++++++++++++++++++++
.../{__init__.py => kafka_message_fetcher.py} | 41 +++++++--
.../streampipes/model/common.py | 4 +-
.../model/resource/data_lake_measure.py | 2 +-
7 files changed, 138 insertions(+), 13 deletions(-)
diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index 4cf4b978d..2675520d0 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -35,6 +35,7 @@ base_packages = [
"pydantic>=1.10.2",
"requests>=2.28.1",
"nats-py>=2.2.0",
+ "confluent-kafka>=2.0.2"
]
dev_packages = base_packages + [
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py
index 45ffbdc7c..32d35fba3 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/__init__.py
@@ -15,12 +15,14 @@
# limitations under the License.
#
from .broker import Broker
+from .kafka_broker import KafkaBroker
from .nats_broker import NatsBroker
from .broker_handler import SupportedBroker, get_broker # isort: skip
__all__ = [
"Broker",
+ "KafkaBroker",
"NatsBroker",
"SupportedBroker",
"get_broker",
diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
index 2c05b839d..3aac0f986 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py
@@ -16,7 +16,7 @@
#
from enum import Enum
-from streampipes.functions.broker import Broker, NatsBroker
+from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker
from streampipes.model.resource.data_stream import DataStream
@@ -24,6 +24,7 @@ class SupportedBroker(Enum):
"""Enum for the supported brokers."""
NATS = "NatsTransportProtocol"
+ KAFKA = "KafkaTransportProtocol"
# TODO Exception should be removed once all brokers are implemented.
@@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo
broker_name = data_stream.event_grounding.transport_protocols[0].class_name
if SupportedBroker.NATS.value in broker_name:
return NatsBroker()
+ elif SupportedBroker.KAFKA.value in broker_name:
+ return KafkaBroker()
else:
raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet')
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
new file mode 100644
index 000000000..900d7f6c1
--- /dev/null
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from typing import Any, AsyncIterator, Dict
+
+from confluent_kafka import Consumer # type: ignore
+from streampipes.functions.broker.broker import Broker
+from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher
+from streampipes.model.common import random_letters
+
+logger = logging.getLogger(__name__)
+
+
+class KafkaBroker(Broker):
+ """Implementation of the NatsBroker"""
+
+ async def _makeConnection(self, hostname: str, port: int) -> None:
+ """Helper function to connect to a server.
+
+ Parameters
+ ----------
+
+ hostname: str
+ The hostname of the of the server, which the broker connects to.
+
+ port: int
+ The port number of the connection.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer = Consumer(
+ {"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
+ )
+
+ async def createSubscription(self) -> None:
+ """Creates a subscription to a data stream.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer.subscribe([self.topic_name])
+
+ logger.info(f"Subscribed to stream: {self.stream_id}")
+
+ async def publish_event(self, event: Dict[str, Any]):
+ """Publish an event to a connected data stream.
+
+ Parameters
+ ----------
+ event: Dict[str, Any]
+ The event to be published.
+
+ Returns
+ -------
+ None
+ """
+
+ # await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8"))
+
+ async def disconnect(self) -> None:
+ """Closes the connection to the server.
+
+ Returns
+ -------
+ None
+ """
+ self.kafka_consumer.close()
+ logger.info(f"Stopped connection to stream: {self.stream_id}")
+
+ def get_message(self) -> AsyncIterator:
+ """Get the published messages of the subscription.
+
+ Returns
+ -------
+ An async iterator for the messages.
+ """
+
+ return KafkaMessageFetcher(self.kafka_consumer)
diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
similarity index 53%
copy from streampipes-client-python/streampipes/functions/broker/__init__.py
copy to streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
index 45ffbdc7c..56bbf46c9 100644
--- a/streampipes-client-python/streampipes/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py
@@ -14,14 +14,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .broker import Broker
-from .nats_broker import NatsBroker
+from confluent_kafka import Consumer # type: ignore
-from .broker_handler import SupportedBroker, get_broker # isort: skip
-__all__ = [
- "Broker",
- "NatsBroker",
- "SupportedBroker",
- "get_broker",
-]
+class KafkaMsg:
+ """An internal representation of a Kafka message
+
+ Parameters
+ ----------
+ data: Byte Array
+ The received Kafka message as byte array
+ """
+
+ def __init__(self, data):
+ self.data = data
+
+
+class KafkaMessageFetcher:
+ """Fetches the next message from Kafka
+
+ Parameters
+ ----------
+ consumer: Consumer
+ The Kafka consumer
+ """
+
+ def __init__(self, consumer: Consumer):
+ self.consumer = consumer
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ msg = self.consumer.poll(0.1)
+ return KafkaMsg(msg.value())
diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py
index edc24b955..a6c21aa84 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -93,8 +93,8 @@ class EventProperty(BasicModel):
description: Optional[StrictStr]
runtime_name: StrictStr
required: StrictBool = Field(default=False)
- domain_properties: List[StrictStr] = Field(default_factory=list)
- property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
+ domain_properties: Optional[List[StrictStr]] = Field(default_factory=list)
+ property_scope: Optional[StrictStr] = Field(default="MEASUREMENT_PROPERTY")
index: StrictInt = Field(default=0)
runtime_id: Optional[StrictStr]
runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
index 1dcbe3a4a..d72732afa 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py
@@ -52,4 +52,4 @@ class DataLakeMeasure(Resource):
pipeline_id: Optional[StrictStr]
pipeline_name: Optional[StrictStr]
pipeline_is_running: StrictBool
- schema_version: StrictStr
+ schema_version: Optional[StrictStr]