You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 15:19:34 UTC
[streampipes] branch 1478-distinction-between-consumer-and-publisher updated: Configured Kafka port
This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch 1478-distinction-between-consumer-and-publisher
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1478-distinction-between-consumer-and-publisher by this push:
new e30382bbd Configured Kafka port
e30382bbd is described below
commit e30382bbd55101c094e541bf900e8cb86aff76d9
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 17:19:33 2023 +0200
Configured Kafka port
---
.../docs/getting-started/first-steps.md | 12 +++++++++++-
...live-data-from-the-streampipes-data-stream.ipynb | 21 ++++++++++++---------
.../streampipes/functions/broker/broker.py | 5 ++++-
.../functions/broker/kafka/kafka_consumer.py | 4 ++--
.../functions/broker/kafka/kafka_message_fetcher.py | 4 +++-
.../functions/broker/kafka/kafka_publisher.py | 2 +-
.../functions/broker/nats/nats_consumer.py | 2 +-
.../functions/broker/nats/nats_publisher.py | 2 +-
.../functions/utils/async_iter_handler.py | 2 +-
9 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/streampipes-client-python/docs/getting-started/first-steps.md b/streampipes-client-python/docs/getting-started/first-steps.md
index 8c75d7636..a8f0288b4 100644
--- a/streampipes-client-python/docs/getting-started/first-steps.md
+++ b/streampipes-client-python/docs/getting-started/first-steps.md
@@ -67,7 +67,17 @@ Once all services are started, you can access StreamPipes via `http://localhost`
#### Setup StreamPipes with Kafka as message broker
Alternatively, you can use `docker-compose.yml` to start StreamPipes with Kafka as messaging layer.
-Therefore, you onyl need to execute the following command:
+When running locally we have to modify `services.kafka.environment` and add the ports to `services.kafka.ports`:
+```yaml
+environment:
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094
+ KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
+...
+ports:
+ - 9094:9094
+```
+Then, you need to execute the following command:
```bash
docker-compose -f docker-compose.yml up -d
```
diff --git a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 0a2187486..5d2dcef89 100644
--- a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -34,14 +34,14 @@
{
"cell_type": "code",
"execution_count": null,
+ "metadata": {
+ "collapsed": false
+ },
"outputs": [],
"source": [
"# You can install all required libraries for this tutorial with the following command\n",
"%pip install matplotlib ipython streampipes"
- ],
- "metadata": {
- "collapsed": false
- }
+ ]
},
{
"cell_type": "code",
@@ -53,7 +53,10 @@
"\n",
"os.environ[\"USER\"] = \"admin@streampipes.apache.org\"\n",
"os.environ[\"API-KEY\"] = \"XXX\"\n",
- "os.environ[\"BROKER-HOST\"] = \"localhost\" # Use this if you work locally"
+ "\n",
+ "# Use this if you work locally:\n",
+ "os.environ[\"BROKER-HOST\"] = \"localhost\" \n",
+ "os.environ[\"KAFKA-PORT\"] = \"9094\" # When using Kafka as message broker"
]
},
{
@@ -591,12 +594,12 @@
},
{
"cell_type": "markdown",
- "source": [
- "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
- ],
"metadata": {
"collapsed": false
- }
+ },
+ "source": [
+ "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
+ ]
},
{
"attachments": {},
diff --git a/streampipes-client-python/streampipes/functions/broker/broker.py b/streampipes-client-python/streampipes/functions/broker/broker.py
index 53b1c075c..62f645356 100644
--- a/streampipes-client-python/streampipes/functions/broker/broker.py
+++ b/streampipes-client-python/streampipes/functions/broker/broker.py
@@ -42,9 +42,12 @@ class Broker(ABC):
transport_protocol = data_stream.event_grounding.transport_protocols[0]
self.topic_name = transport_protocol.topic_definition.actual_topic_name
hostname = transport_protocol.broker_hostname
+ port = transport_protocol.port
if "BROKER-HOST" in os.environ.keys():
hostname = os.environ["BROKER-HOST"]
- await self._make_connection(hostname, transport_protocol.port)
+ if "Kafka" in transport_protocol.class_name and "KAFKA-PORT" in os.environ.keys():
+ port = int(os.environ["KAFKA-PORT"])
+ await self._make_connection(hostname, port)
@abstractmethod
async def _make_connection(self, hostname: str, port: int) -> None:
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
index 18bb97776..bcf5215c0 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_consumer.py
@@ -48,7 +48,7 @@ class KafkaConsumer(Consumer):
self.kafka_consumer = KafkaConnection(
{"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"}
)
- logger.info(f"Connected to Kafka at {hostname}:{port}")
+ logger.info(f"Connecting to Kafka at {hostname}:{port}")
async def _create_subscription(self) -> None:
"""Creates a subscription to a data stream.
@@ -58,7 +58,7 @@ class KafkaConsumer(Consumer):
None
"""
self.kafka_consumer.subscribe([self.topic_name])
- logger.info(f"Subscribed to stream: {self.stream_id}")
+ logger.info(f"Subscribing to stream: {self.stream_id}")
async def disconnect(self) -> None:
"""Closes the connection to the server.
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
index d8e9fc3d2..efa7f31bd 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_message_fetcher.py
@@ -46,5 +46,7 @@ class KafkaMessageFetcher:
return self
async def __anext__(self):
- msg = self.consumer.poll(0.1)
+ msg = None
+ while not msg:
+ msg = self.consumer.poll(0.1)
return KafkaMessage(msg.value())
diff --git a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
index 8e8643e66..117fa2cc6 100644
--- a/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/kafka/kafka_publisher.py
@@ -45,7 +45,7 @@ class KafkaPublisher(Publisher):
None
"""
self.kafka_producer = Producer({"bootstrap.servers": f"{hostname}:{port}"})
- logger.info(f"Connected to Kafka at {hostname}:{port}")
+ logger.info(f"Connecting to Kafka at {hostname}:{port}")
async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
diff --git a/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
index 18427cb27..b57f0f35b 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_consumer.py
@@ -44,7 +44,7 @@ class NatsConsumer(Consumer):
"""
self.nats_client = await connect(f"nats://{hostname}:{port}")
- logger.info(f"Connected to NATS at {hostname}:{port}")
+ logger.info(f"Connecting to NATS at {hostname}:{port}")
async def _create_subscription(self) -> None:
"""Creates a subscription to a data stream.
diff --git a/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
index 798c1940f..37ffa4e78 100644
--- a/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
+++ b/streampipes-client-python/streampipes/functions/broker/nats/nats_publisher.py
@@ -45,7 +45,7 @@ class NatsPublisher(Publisher):
"""
self.nats_client = await connect(f"nats://{hostname}:{port}")
- logger.info(f"Connected to NATS at {hostname}:{port}")
+ logger.info(f"Connecting to NATS at {hostname}:{port}")
async def publish_event(self, event: Dict[str, Any]):
"""Publish an event to a connected data stream.
diff --git a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
index 2b69dfd6d..62a742d5b 100644
--- a/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
+++ b/streampipes-client-python/streampipes/functions/utils/async_iter_handler.py
@@ -39,7 +39,7 @@ class AsyncIterHandler:
"""
try:
return stream_id, await message.__anext__()
- except StopAsyncIteration:
+ except (StopAsyncIteration, RuntimeError):
return "stop", None
@staticmethod