You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streampipes.apache.org by "SvenO3 (via GitHub)" <gi...@apache.org> on 2023/04/26 17:06:00 UTC

[PR] Python Functions: Distinction between consumer and publisher (streampipes)

SvenO3 opened a new pull request, #1523:
URL: https://github.com/apache/streampipes/pull/1523

   <!--
     ~ Licensed to the Apache Software Foundation (ASF) under one or more
     ~ contributor license agreements.  See the NOTICE file distributed with
     ~ this work for additional information regarding copyright ownership.
     ~ The ASF licenses this file to You under the Apache License, Version 2.0
     ~ (the "License"); you may not use this file except in compliance with
     ~ the License.  You may obtain a copy of the License at
     ~
     ~    http://www.apache.org/licenses/LICENSE-2.0
     ~
     ~ Unless required by applicable law or agreed to in writing, software
     ~ distributed under the License is distributed on an "AS IS" BASIS,
     ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     ~ See the License for the specific language governing permissions and
     ~ limitations under the License.
     ~
     -->
     
     <!--
   Thanks for contributing! Here are some tips you can follow to help us incorporate your contribution quickly and easily:
   1. If this is your first time, please read our contributor guidelines:
       - https://streampipes.apache.org/getinvolved.html
       - https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started
   2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title ...`
   3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., `[WIP][#<GitHub issue id>] PR title ...`.
   4. Please write your PR title to summarize what this PR proposes/fixes.
   5. Link the PR to the corresponding GitHub issue (if present) in the `Development` section in the right menu bar. 
   6. Be sure to keep the PR description updated to reflect all changes.
   7. If possible, provide a concise example to reproduce the issue for a faster review.
   8. Make sure tests pass via `mvn clean install`.
   9. (Optional) If the contribution is large, please file an Apache ICLA
       - http://apache.org/licenses/icla.pdf
   -->
   
   ### Purpose
   <!--
   Please clarify what changes you are proposing and describe how those changes will address the issue.
   Furthermore, describe potential consequences the changes might have.
   -->
   Fixes #1478 and enables to create output data streams with Kafka as message broker.
   ### Remarks
   <!--
   Is there anything left we need to pay attention on?
   Are there some references that might be important? E.g. links to Confluence, or discussions
   on the mailing list or GitHub.
   -->
   PR introduces (a) breaking change(s): no
   
   PR introduces (a) deprecation(s): no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182205932


##########
streampipes-client-python/streampipes/functions/utils/data_stream_generator.py:
##########
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
         ]
     )
 
-    if not stream_id:
-        return DataStream(name=name, event_schema=event_schema)
-    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+    transport_protocols = [TransportProtocol()]
+    if broker == SupportedBroker.KAFKA:
+        transport_protocols = [
+            TransportProtocol(
+                class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol",  # type: ignore

Review Comment:
   Because `class_name` is an alias from pydantic, mypy says that it doesn't exists 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182193674


##########
streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb:
##########
@@ -324,7 +327,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from river import cluster, compose, preprocessing, tree\n",
+    "import pickle\n",

Review Comment:
   It's part of the python standard library. So you normally don't have to install it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182219384


##########
streampipes-client-python/streampipes/function_zoo/river_function.py:
##########
@@ -187,7 +188,12 @@ def __init__(
             attributes["truth"] = prediction_type
             if target_label is None:
                 raise ValueError("You must define a target attribute for a supervised model.")
-        output_stream = create_data_stream("prediction", attributes)
+
+        output_stream = create_data_stream(
+            name="prediction",
+            attributes=attributes,
+            broker=get_broker_enum(client.dataStreamApi.get(stream_ids[0])),  # type: ignore

Review Comment:
   The problem is that `get` returns a `Ressource`, but `get_broker_enum` expects a `DataStream` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182807354


##########
streampipes-client-python/streampipes/functions/utils/data_stream_generator.py:
##########
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
         ]
     )
 
-    if not stream_id:
-        return DataStream(name=name, event_schema=event_schema)
-    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+    transport_protocols = [TransportProtocol()]
+    if broker == SupportedBroker.KAFKA:
+        transport_protocols = [
+            TransportProtocol(
+                class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol",  # type: ignore

Review Comment:
   Okay, we leave it for now, but it should work :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182237372


##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   We could use a `cast` like this:
   ```
   self.publisher = cast(Publisher, get_broker(data_stream, is_publisher=True))
   ```
   Do you think this is a better solution or does it make the code less readable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1183678987


##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   Yes, the return type is the parent class `Broker`, but mypy shows the error:
   ```
   Incompatible types in assignment (expression has type "Broker", variable has type "Publisher")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1181187352


##########
streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb:
##########
@@ -324,7 +327,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from river import cluster, compose, preprocessing, tree\n",
+    "import pickle\n",

Review Comment:
   We should add this to the command `pip install river streampipes` as well



##########
streampipes-client-python/streampipes/functions/broker/broker_handler.py:
##########
@@ -55,8 +63,37 @@ def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for mo
     """
     broker_name = data_stream.event_grounding.transport_protocols[0].class_name
     if SupportedBroker.NATS.value in broker_name:
-        return NatsBroker()
+        if is_publisher:
+            return NatsPublisher()
+        return NatsConsumer()
     elif SupportedBroker.KAFKA.value in broker_name:
-        return KafkaBroker()
+        if is_publisher:
+            return KafkaPublisher()
+        return KafkaConsumer()
     else:
         raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
+
+
+def get_broker_enum(data_stream: DataStream) -> SupportedBroker:
+    """Derive the enum of the broker for the given data stream.
+
+    Parameters
+    ----------
+    data_stream: DataStream
+        Data stream instance from which the broker is inferred
+
+    Returns
+    -------
+    broker: SupportedBroker
+        The corresponding broker enum derived from data stream.
+
+    Raises
+    ------
+    UnsupportedBrokerError
+        Is raised when the given data stream belongs to a broker that is currently not supported by StreamPipes Python.
+    """
+    broker_name = data_stream.event_grounding.transport_protocols[0].class_name
+    for b in SupportedBroker:
+        if b.value in broker_name:
+            return b
+    raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')

Review Comment:
   ... doesn't support...



##########
streampipes-client-python/streampipes/functions/broker/__init__.py:
##########
@@ -15,15 +15,26 @@
 # limitations under the License.
 #
 from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from .consumer import Consumer
+from .publisher import Publisher
+
+# isort: split

Review Comment:
   is this really needed?



##########
streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb:
##########
@@ -324,7 +327,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from river import cluster, compose, preprocessing, tree\n",
+    "import pickle\n",

Review Comment:
   We should add this to the command `pip install river streampipes` as well



##########
streampipes-client-python/streampipes/functions/utils/data_stream_generator.py:
##########
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
         ]
     )
 
-    if not stream_id:
-        return DataStream(name=name, event_schema=event_schema)
-    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+    transport_protocols = [TransportProtocol()]
+    if broker == SupportedBroker.KAFKA:
+        transport_protocols = [
+            TransportProtocol(
+                class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol",  # type: ignore

Review Comment:
   muting mypy shouldn't be necessary here



##########
streampipes-client-python/streampipes/functions/function_handler.py:
##########
@@ -77,7 +77,7 @@ def initializeFunctions(self) -> None:
                 # Get the data stream schema from the API
                 data_stream: DataStream = self.client.dataStreamApi.get(stream_id)  # type: ignore
                 # Get the broker
-                broker = get_broker(data_stream)
+                broker: Consumer = get_broker(data_stream)  # type: ignore

Review Comment:
   same as for publisher



##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   Can't you get rid of `# type: ignore` here? 



##########
streampipes-client-python/streampipes/functions/broker/broker_handler.py:
##########
@@ -55,8 +63,37 @@ def get_broker(data_stream: DataStream) -> Broker:  # TODO implementation for mo
     """
     broker_name = data_stream.event_grounding.transport_protocols[0].class_name
     if SupportedBroker.NATS.value in broker_name:
-        return NatsBroker()
+        if is_publisher:
+            return NatsPublisher()
+        return NatsConsumer()
     elif SupportedBroker.KAFKA.value in broker_name:
-        return KafkaBroker()
+        if is_publisher:
+            return KafkaPublisher()
+        return KafkaConsumer()
     else:
         raise UnsupportedBrokerError(f'The python client doesn\'t include the broker: "{broker_name}" yet')
+
+
+def get_broker_enum(data_stream: DataStream) -> SupportedBroker:

Review Comment:
   Suggestion: `get_broker_description`



##########
streampipes-client-python/streampipes/function_zoo/river_function.py:
##########
@@ -187,7 +188,12 @@ def __init__(
             attributes["truth"] = prediction_type
             if target_label is None:
                 raise ValueError("You must define a target attribute for a supervised model.")
-        output_stream = create_data_stream("prediction", attributes)
+
+        output_stream = create_data_stream(
+            name="prediction",
+            attributes=attributes,
+            broker=get_broker_enum(client.dataStreamApi.get(stream_ids[0])),  # type: ignore

Review Comment:
   there shouldn't be a need to mute mypy here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182200394


##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   Maybe, but `get_broker` returns a `Publisher` or `Subscriber`. Do you know a solution for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182808879


##########
streampipes-client-python/streampipes/function_zoo/river_function.py:
##########
@@ -187,7 +188,12 @@ def __init__(
             attributes["truth"] = prediction_type
             if target_label is None:
                 raise ValueError("You must define a target attribute for a supervised model.")
-        output_stream = create_data_stream("prediction", attributes)
+
+        output_stream = create_data_stream(
+            name="prediction",
+            attributes=attributes,
+            broker=get_broker_enum(client.dataStreamApi.get(stream_ids[0])),  # type: ignore

Review Comment:
   But `DataStream` is a `Resource` right? 
   But nevermind, we can keep it for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#issuecomment-1529658979

   Thanks a lot for improving the broker setup again @SvenO3 🙏🏼 
   If you find some time it would be awesome if you could improve the test coverage of the broker module


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1185416906


##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   Okay, then let's just stick with `type:ignore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182784085


##########
streampipes-client-python/streampipes/functions/broker/output_collector.py:
##########
@@ -32,14 +32,14 @@ class OutputCollector:
 
     Attributes
     ----------
-    broker: Broker
-        The broker instance that sends the data to StreamPipes
+    publisher: Publisher
+        The publisher instance that sends the data to StreamPipes
 
     """
 
     def __init__(self, data_stream: DataStream) -> None:
-        self.broker = get_broker(data_stream)
-        self._run_coroutine(self.broker.connect(data_stream))
+        self.publisher: Publisher = get_broker(data_stream, is_publisher=True)  # type: ignore

Review Comment:
   If it's only about two return types, you can do `get_broker(...) -> Union[Publisher | Subscriber]` 
   But they have the same parent class right?
   So in this case the parent class should work as return type as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182806807


##########
streampipes-client-python/streampipes/functions/utils/data_stream_generator.py:
##########
@@ -83,6 +94,19 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
         ]
     )
 
-    if not stream_id:
-        return DataStream(name=name, event_schema=event_schema)
-    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
+    transport_protocols = [TransportProtocol()]
+    if broker == SupportedBroker.KAFKA:
+        transport_protocols = [
+            TransportProtocol(
+                class_name="org.apache.streampipes.model.grounding.KafkaTransportProtocol",  # type: ignore
+                broker_hostname="kafka",
+                kafkaPort=9092,

Review Comment:
   `kafkaPort` should be `port` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "bossenti (via GitHub)" <gi...@apache.org>.
bossenti commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182780416


##########
streampipes-client-python/docs/tutorials/4-using-online-machine-learning-on-a-streampipes-data-stream.ipynb:
##########
@@ -324,7 +327,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from river import cluster, compose, preprocessing, tree\n",
+    "import pickle\n",

Review Comment:
   good point 👍🏼 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "SvenO3 (via GitHub)" <gi...@apache.org>.
SvenO3 commented on code in PR #1523:
URL: https://github.com/apache/streampipes/pull/1523#discussion_r1182212567


##########
streampipes-client-python/streampipes/functions/broker/__init__.py:
##########
@@ -15,15 +15,26 @@
 # limitations under the License.
 #
 from .broker import Broker
-from .kafka_broker import KafkaBroker
-from .nats_broker import NatsBroker
+from .consumer import Consumer
+from .publisher import Publisher
+
+# isort: split

Review Comment:
   Yes, because the `KafkaPublisher` and `NatsPublisher` import the `Publisher`, it has to be imported first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Python Functions: Distinction between consumer and publisher (streampipes)

Posted by "tenthe (via GitHub)" <gi...@apache.org>.
tenthe merged PR #1523:
URL: https://github.com/apache/streampipes/pull/1523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org