You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 15:19:34 UTC

[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Configured Kafka port

This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
     new e30382bbd Configured Kafka port
e30382bbd is described below

commit e30382bbd55101c094e541bf900e8cb86aff76d9
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 17:19:33 2023 +0200

    Configured Kafka port
---
 .../docs/getting-started/first-steps.md             | 12 +++++++++++-
 ...live-data-from-the-streampipes-data-stream.ipynb | 21 ++++++++++++---------
 .../streampipes/functions/broker/broker.py          |  5 ++++-
 .../functions/broker/kafka/kafka_consumer.py        |  4 ++--
 .../functions/broker/kafka/kafka_message_fetcher.py |  4 +++-
 .../functions/broker/kafka/kafka_publisher.py       |  2 +-
 .../functions/broker/nats/nats_consumer.py          |  2 +-
 .../functions/broker/nats/nats_publisher.py         |  2 +-
 .../functions/utils/async_iter_handler.py           |  2 +-
 9 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/streampipes-client-python/docs/getting-started/first-steps.md b/streampipes-client-python/docs/getting-started/first-steps.md
index 8c75d7636..a8f0288b4 100644
--- a/streampipes-client-python/docs/getting-started/first-steps.md
+++ b/streampipes-client-python/docs/getting-started/first-steps.md
@@ -67,7 +67,17 @@ Once all services are started, you can access StreamPipes via `http://localhost`
 
 #### Setup StreamPipes with Kafka as message broker
 Alternatively, you can use `docker-compose.yml` to start StreamPipes with Kafka as messaging layer.
-Therefore, you onyl need to execute the following command:
+When running locally we have to modify `services.kafka.environment` and add the ports to `services.kafka.ports`:
+```yaml
+environment:
+  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094
+  KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
+...
+ports:
+  - 9094:9094
+```
+Then, you need to execute the following command:
 ```bash
 docker-compose -f docker-compose.yml up -d
 ```
diff --git a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 0a2187486..5d2dcef89 100644
--- a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -34,14 +34,14 @@
   {
    "cell_type": "code",
    "execution_count": null,
+   "metadata": {
+    "collapsed": false
+   },
    "outputs": [],
    "source": [
     "# You can install all required libraries for this tutorial with the following command\n",
     "%pip install matplotlib ipython streampipes"
-   ],
-   "metadata": {
-    "collapsed": false
-   }
+   ]
   },
   {
    "cell_type": "code",
@@ -53,7 +53,10 @@
     "\n",
     "os.environ[\"USER\"] = \"admin@streampipes.apache.org\"\n",
     "os.environ[\"API-KEY\"] = \"XXX\"\n",
-    "os.environ[\"BROKER-HOST\"] = \"localhost\"  # Use this if you work locally"
+    "\n",
+    "# Use this if you work locally:\n",
+    "os.environ[\"BROKER-HOST\"] = \"localhost\"  \n",
+    "os.environ[\"KAFKA-PORT\"] = \"9094\" # When using Kafka as message broker"
    ]
   },
   {
@@ -591,12 +594,12 @@
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
+   ]
   },
   {
    "attachments": {},
diff --git a/streampipes-client-python/streampipes/functions/broker/broker.py b/streampipes-client-python/streampipes/functions/broker/broker.py
index 53b1c075c..62f645356 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker.py
@@ -42,9 +42,12 @@ class Broker(ABC):
         transport_protocol = data_stream.event_grounding.transport_protocols[0]
         self.topic_name = transport_protocol.topic_definition.actual_topic_name
         hostname = transport_protocol.broker_hostname
+        port = transport_protocol.port
         if "BROKER-HOST" in os.environ.keys():
             hostname = os.environ["BROKER-HOST"]
-        await self._make_connection(hostname, transport_protocol.port)
+            if "Kafka" in transport_protocol.class_name and "KAFKA-PORT" in os.environ.keys():
+                port = int(os.environ["KAFKA-PORT"])
+        await self._make_connection(hostname, port)
 
     @abstractmethod
     async def _make_connection(self, hostname: str, port: int) -> None:
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
index 18bb97776..bcf5215c0 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
@@ -48,7 +48,7 @@ class KafkaConsumer(Consumer):
         self.kafka_consumer = KafkaConnection(
             {"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
         )
-        logger.info(f"Connected to Kafka at {hostname}:{port}")
+        logger.info(f"Connecting to Kafka at {hostname}:{port}")
 
     async def _create_subscription(self) -> None:
         """Creates a subscription to a data stream.
@@ -58,7 +58,7 @@ class KafkaConsumer(Consumer):
         None
         """
         self.kafka_consumer.subscribe([self.topic_name])
-        logger.info(f"Subscribed to stream: {self.stream_id}")
+        logger.info(f"Subscribing to stream: {self.stream_id}")
 
     async def disconnect(self) -> None:
         """Closes the connection to the server.
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
index d8e9fc3d2..efa7f31bd 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
@@ -46,5 +46,7 @@ class KafkaMessageFetcher:
         return self
 
     async def __anext__(self):
-        msg = self.consumer.poll(0.1)
+        msg = None
+        while not msg:
+            msg = self.consumer.poll(0.1)
         return KafkaMessage(msg.value())
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 8e8643e66..117fa2cc6 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -45,7 +45,7 @@ class KafkaPublisher(Publisher):
         None
         """
         self.kafka_producer = Producer({"bootstrap.servers": f"{hostname}:{port}"})
-        logger.info(f"Connected to Kafka at {hostname}:{port}")
+        logger.info(f"Connecting to Kafka at {hostname}:{port}")
 
     async def publish_event(self, event: Dict[str, Any]):
         """Publish an event to a connected data stream.
diff --git a/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
index 18427cb27..b57f0f35b 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
@@ -44,7 +44,7 @@ class NatsConsumer(Consumer):
 
         """
         self.nats_client = await connect(f"nats://{hostname}:{port}")
-        logger.info(f"Connected to NATS at {hostname}:{port}")
+        logger.info(f"Connecting to NATS at {hostname}:{port}")
 
     async def _create_subscription(self) -> None:
         """Creates a subscription to a data stream.
diff --git a/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
index 798c1940f..37ffa4e78 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
@@ -45,7 +45,7 @@ class NatsPublisher(Publisher):
 
         """
         self.nats_client = await connect(f"nats://{hostname}:{port}")
-        logger.info(f"Connected to NATS at {hostname}:{port}")
+        logger.info(f"Connecting to NATS at {hostname}:{port}")
 
     async def publish_event(self, event: Dict[str, Any]):
         """Publish an event to a connected data stream.
diff --git a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
index 2b69dfd6d..62a742d5b 100644
--- a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
+++ b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
@@ -39,7 +39,7 @@ class AsyncIterHandler:
         """
         try:
             return stream_id, await message.__anext__()
-        except StopAsyncIteration:
+        except (StopAsyncIteration, RuntimeError):
             return "stop", None
 
     @staticmethod