You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streampipes.apache.org by GitBox <gi...@apache.org> on 2022/12/07 15:30:48 UTC

[GitHub] [streampipes] SvenO3 opened a new pull request, #855: [#854] First implementation of the StreamPipesFunctions in python

SvenO3 opened a new pull request, #855:
URL: https://github.com/apache/streampipes/pull/855

   <!--
     ~ Licensed to the Apache Software Foundation (ASF) under one or more
     ~ contributor license agreements.  See the NOTICE file distributed with
     ~ this work for additional information regarding copyright ownership.
     ~ The ASF licenses this file to You under the Apache License, Version 2.0
     ~ (the "License"); you may not use this file except in compliance with
     ~ the License.  You may obtain a copy of the License at
     ~
     ~    http://www.apache.org/licenses/LICENSE-2.0
     ~
     ~ Unless required by applicable law or agreed to in writing, software
     ~ distributed under the License is distributed on an "AS IS" BASIS,
     ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     ~ See the License for the specific language governing permissions and
     ~ limitations under the License.
     ~
     -->
     
     <!--
   Thanks for contributing! Here are some tips you can follow to help us incorporate your contribution quickly and easily:
   1. If this is your first time, please read our contributor guidelines:
       - https://streampipes.apache.org/getinvolved.html
       - https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started
   2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title ...`
   3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., `[WIP][#<GitHub issue id>] PR title ...`.
   4. Please write your PR title to summarize what this PR proposes/fixes.
   5. Link the PR to the corresponding GitHub issue (if present) in the `Development` section in the right menu bar. 
   6. Be sure to keep the PR description updated to reflect all changes.
   7. If possible, provide a concise example to reproduce the issue for a faster review.
   8. Make sure tests pass via `mvn clean install`.
   9. (Optional) If the contribution is large, please file an Apache ICLA
       - http://apache.org/licenses/icla.pdf
   -->
   
   ### Purpose
   <!--
   Please clarify what changes you are proposing and describe how those changes will address the issue.
   Furthermore, describe potential consequences the changes might have.
   -->
   This is a first approach to implement the StreamPipesFunctions  and it allows to get the live data of data streams in python.
   At the moment it only supports the Nats messaging protocol.
   
   ### Remarks
   <!--
   Is there anything left we need to pay attention on?
   Are there some references that might be important? E.g. links to Confluence, or discussions
   on the mailing list or GitHub.
   -->
   PR introduces (a) breaking change(s): no
   
   PR introduces (a) deprecation(s): no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] SvenO3 commented on a diff in pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
SvenO3 commented on code in PR #855:
URL: https://github.com/apache/streampipes/pull/855#discussion_r1045553211


##########
streampipes-client-python/streampipes_client/functions/broker/broker.py:
##########
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import AsyncIterator
+
+from streampipes_client.model.resource.data_stream import DataStream
+
+
+class Broker(ABC):
+    """Abstract implementation of a broker.
+    A broker is used to subscribe to a data stream and to consume the published events.
+    """
+
+    async def connect(self, data_stream: DataStream, host_address: str) -> None:

Review Comment:
   Yes, the method waits until the connection is established. The `async` is just needed because you have to use the `await` keyword to connect to the broker. This is because the `connect` method in the nats library is a coroutine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] SvenO3 commented on a diff in pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
SvenO3 commented on code in PR #855:
URL: https://github.com/apache/streampipes/pull/855#discussion_r1045562722


##########
streampipes-client-python/streampipes_client/functions/broker/nats_broker.py:
##########
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from typing import AsyncIterator
+
+from nats import connect
+from nats.aio.client import Client
+from nats.aio.subscription import Subscription
+from streampipes_client.functions.broker.broker import Broker
+
+logger = logging.getLogger(__name__)
+
+
+class NatsBroker(Broker):
+    """Implementation of the NatsBroker"""
+
+    nats_client: Client
+    subscription: Subscription
+
+    async def _makeConnection(self, host_address: str, port: int) -> None:
+        """Helper function to connect to a server.
+
+        Parameters
+        ----------
+
+        host_address: str
+            The host address of the server, which the broker connects to.
+
+        port: int
+            The port number of the connection.
+
+        Returns
+        -------
+        None
+        """
+        self.nats_client = await connect(f"nats://{host_address}:{port}")
+        if self.nats_client.connected_url is not None:

Review Comment:
   I don't really know if it's even possible that the else case happens because I think their should always be a `connected_url` if the connection was successful. I only added the if because the type checking failed as the url could be `None`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] bossenti merged pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
bossenti merged PR #855:
URL: https://github.com/apache/streampipes/pull/855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] SvenO3 commented on a diff in pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
SvenO3 commented on code in PR #855:
URL: https://github.com/apache/streampipes/pull/855#discussion_r1045567396


##########
streampipes-client-python/streampipes_client/functions/function_handler.py:
##########
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import asyncio
+import json
+import logging
+from typing import AsyncIterator, Dict, List
+
+from streampipes_client.client.client import StreamPipesClient
+from streampipes_client.functions.broker.broker import Broker
+from streampipes_client.functions.broker.nats_broker import NatsBroker
+from streampipes_client.functions.registration import Registration
+from streampipes_client.functions.utils.async_iter_handler import AsyncIterHandler
+from streampipes_client.functions.utils.data_stream_context import DataStreamContext
+from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource.data_stream import DataStream
+
+logger = logging.getLogger(__name__)
+
+
+# TODO Exception should be removed once all brokers are implemented.
+class UnsupportedBroker(Exception):
+    """Exception if a broker isn't implemented yet."""
+
+    def __init__(self, message):
+        super().__init__(message)
+
+
+class FunctionHandler:
+    """The function handler manages the StreamPipesFunctions.
+    It controls the connection to the brokers, starts the functions, manages the broadcast of the live data
+    and is able to stop the connection to the brokers and functions.
+
+    Parameters
+    ----------
+    registration: Registration
+        The registration, that contains the StreamPipesFunctions.
+    client: StreamPipesClient
+        The client to interact with the API.
+    """
+
+    def __init__(self, registration: Registration, client: StreamPipesClient) -> None:
+        self.registration = registration
+        self.client = client
+        self.stream_contexts: Dict[str, DataStreamContext] = {}
+        self.brokers: List[Broker] = []
+
+    def initializeFunctions(self) -> None:
+        """Creates the context for every data stream and starts the event loop to manage the StreamPipesFunctions.
+
+        Returns
+        -------
+        None
+        """
+        # Choose the broker and collect the schema for every data stream
+        for streampipes_function in self.registration.getFunctions():
+            for stream_id in streampipes_function.requiredStreamIds():
+                # Get the data stream schema from the API
+                data_stream: DataStream = self.client.dataStreamApi.get(stream_id)  # type: ignore
+                # Get the broker
+                broker = self._get_broker(data_stream.event_grounding.transport_protocols[0].broker_hostname)
+                # Assign the functions, broker and schema to every stream
+                if stream_id in self.stream_contexts.keys():
+                    self.stream_contexts[stream_id].add_function(streampipes_function)
+                else:
+                    self.stream_contexts[stream_id] = DataStreamContext(
+                        functions=[streampipes_function], schema=data_stream, broker=broker
+                    )
+                logger.info(f"Using {broker.__class__.__name__} for {streampipes_function}")
+
+        # Start the function loop or add it as tasks if a loop is already running
+        try:
+            loop = asyncio.get_running_loop()
+        except RuntimeError:
+            asyncio.run(self._function_loop())
+        else:
+            loop.create_task(self._function_loop())
+
+    def _get_broker(self, broker_name: str) -> Broker:  # TODO implementation for more transport_protocols
+        """Get a broker by a name.
+
+        Parameters
+        ----------
+        broker_name: str
+            A string that represents a broker.
+
+        Returns
+        -------
+        The broker which belongs to the name.
+        """
+        if broker_name == "nats":

Review Comment:
   I can do this. That's a good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] bossenti commented on a diff in pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
bossenti commented on code in PR #855:
URL: https://github.com/apache/streampipes/pull/855#discussion_r1045072407


##########
streampipes-client-python/streampipes_client/functions/function_handler.py:
##########
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import asyncio
+import json
+import logging
+from typing import AsyncIterator, Dict, List
+
+from streampipes_client.client.client import StreamPipesClient
+from streampipes_client.functions.broker.broker import Broker
+from streampipes_client.functions.broker.nats_broker import NatsBroker
+from streampipes_client.functions.registration import Registration
+from streampipes_client.functions.utils.async_iter_handler import AsyncIterHandler
+from streampipes_client.functions.utils.data_stream_context import DataStreamContext
+from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource.data_stream import DataStream
+
+logger = logging.getLogger(__name__)
+
+
+# TODO Exception should be removed once all brokers are implemented.
+class UnsupportedBroker(Exception):
+    """Exception if a broker isn't implemented yet."""
+
+    def __init__(self, message):
+        super().__init__(message)
+
+
+class FunctionHandler:
+    """The function handler manages the StreamPipesFunctions.
+    It controls the connection to the brokers, starts the functions, manages the broadcast of the live data
+    and is able to stop the connection to the brokers and functions.
+
+    Parameters
+    ----------
+    registration: Registration
+        The registration, that contains the StreamPipesFunctions.
+    client: StreamPipesClient
+        The client to interact with the API.
+    """
+
+    def __init__(self, registration: Registration, client: StreamPipesClient) -> None:
+        self.registration = registration
+        self.client = client

Review Comment:
   I think this kind of breaks the basic idea of how we want to structure the API of the Python client.
   But that's not really important for now, I'll give it some thought and come up with a solution.



##########
streampipes-client-python/streampipes_client/functions/broker/nats_broker.py:
##########
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from typing import AsyncIterator
+
+from nats import connect
+from nats.aio.client import Client
+from nats.aio.subscription import Subscription
+from streampipes_client.functions.broker.broker import Broker
+
+logger = logging.getLogger(__name__)
+
+
+class NatsBroker(Broker):
+    """Implementation of the NatsBroker"""
+
+    nats_client: Client

Review Comment:
   This should be instance variables and not class variables right?



##########
streampipes-client-python/streampipes_client/functions/broker/broker.py:
##########
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+from typing import AsyncIterator
+
+from streampipes_client.model.resource.data_stream import DataStream
+
+
+class Broker(ABC):
+    """Abstract implementation of a broker.
+    A broker is used to subscribe to a data stream and to consume the published events.
+    """
+
+    async def connect(self, data_stream: DataStream, host_address: str) -> None:

Review Comment:
   What is the intention behind making the `connect` method asynchronous? 
   Usually, you should wait until the connection was established successfully, right?
   Or do I miss anything here?



##########
streampipes-client-python/streampipes_client/functions/function_handler.py:
##########
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import asyncio
+import json
+import logging
+from typing import AsyncIterator, Dict, List
+
+from streampipes_client.client.client import StreamPipesClient
+from streampipes_client.functions.broker.broker import Broker
+from streampipes_client.functions.broker.nats_broker import NatsBroker
+from streampipes_client.functions.registration import Registration
+from streampipes_client.functions.utils.async_iter_handler import AsyncIterHandler
+from streampipes_client.functions.utils.data_stream_context import DataStreamContext
+from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource.data_stream import DataStream
+
+logger = logging.getLogger(__name__)
+
+
+# TODO Exception should be removed once all brokers are implemented.
+class UnsupportedBroker(Exception):
+    """Exception if a broker isn't implemented yet."""
+
+    def __init__(self, message):
+        super().__init__(message)
+
+
+class FunctionHandler:
+    """The function handler manages the StreamPipesFunctions.
+    It controls the connection to the brokers, starts the functions, manages the broadcast of the live data
+    and is able to stop the connection to the brokers and functions.
+
+    Parameters
+    ----------
+    registration: Registration
+        The registration, that contains the StreamPipesFunctions.
+    client: StreamPipesClient
+        The client to interact with the API.
+    """
+
+    def __init__(self, registration: Registration, client: StreamPipesClient) -> None:
+        self.registration = registration
+        self.client = client
+        self.stream_contexts: Dict[str, DataStreamContext] = {}
+        self.brokers: List[Broker] = []
+
+    def initializeFunctions(self) -> None:
+        """Creates the context for every data stream and starts the event loop to manage the StreamPipesFunctions.
+
+        Returns
+        -------
+        None
+        """
+        # Choose the broker and collect the schema for every data stream
+        for streampipes_function in self.registration.getFunctions():
+            for stream_id in streampipes_function.requiredStreamIds():
+                # Get the data stream schema from the API
+                data_stream: DataStream = self.client.dataStreamApi.get(stream_id)  # type: ignore
+                # Get the broker
+                broker = self._get_broker(data_stream.event_grounding.transport_protocols[0].broker_hostname)
+                # Assign the functions, broker and schema to every stream
+                if stream_id in self.stream_contexts.keys():
+                    self.stream_contexts[stream_id].add_function(streampipes_function)
+                else:
+                    self.stream_contexts[stream_id] = DataStreamContext(
+                        functions=[streampipes_function], schema=data_stream, broker=broker
+                    )
+                logger.info(f"Using {broker.__class__.__name__} for {streampipes_function}")
+
+        # Start the function loop or add it as tasks if a loop is already running
+        try:
+            loop = asyncio.get_running_loop()
+        except RuntimeError:
+            asyncio.run(self._function_loop())
+        else:
+            loop.create_task(self._function_loop())
+
+    def _get_broker(self, broker_name: str) -> Broker:  # TODO implementation for more transport_protocols
+        """Get a broker by a name.
+
+        Parameters
+        ----------
+        broker_name: str
+            A string that represents a broker.
+
+        Returns
+        -------
+        The broker which belongs to the name.
+        """
+        if broker_name == "nats":

Review Comment:
   Would you mind to create an enum for supported brokers?



##########
streampipes-client-python/streampipes_client/functions/broker/nats_broker.py:
##########
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from typing import AsyncIterator
+
+from nats import connect
+from nats.aio.client import Client
+from nats.aio.subscription import Subscription
+from streampipes_client.functions.broker.broker import Broker
+
+logger = logging.getLogger(__name__)
+
+
+class NatsBroker(Broker):
+    """Implementation of the NatsBroker"""
+
+    nats_client: Client
+    subscription: Subscription
+
+    async def _makeConnection(self, host_address: str, port: int) -> None:
+        """Helper function to connect to a server.
+
+        Parameters
+        ----------
+
+        host_address: str
+            The host address of the server, which the broker connects to.
+
+        port: int
+            The port number of the connection.
+
+        Returns
+        -------
+        None
+        """
+        self.nats_client = await connect(f"nats://{host_address}:{port}")
+        if self.nats_client.connected_url is not None:

Review Comment:
   else should raise an exception here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [streampipes] bossenti commented on pull request #855: [#854] First implementation of the StreamPipesFunctions in python

Posted by GitBox <gi...@apache.org>.
bossenti commented on PR #855:
URL: https://github.com/apache/streampipes/pull/855#issuecomment-1341245882

   @SvenO3 congratulations to your first PR as a StreamPipes comitter 🥳 
   Looks very promising, I'll have a deeper look on it the upcoming days!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org