You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/02/13 16:01:40 UTC
[streampipes] branch rel/0.91.0 updated: [#1245] provide a temporary workaround for inconsistency in eventGrounding (#1250)
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch rel/0.91.0
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/rel/0.91.0 by this push:
new 1312087bb [#1245] provide a temporary workaround for inconsistency in eventGrounding (#1250)
1312087bb is described below
commit 1312087bb6bfc0f7b4c32381752b842f47b49b6f
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Mon Feb 13 16:53:30 2023 +0100
[#1245] provide a temporary workaround for inconsistency in eventGrounding (#1250)
* [#1245] provide a temporary workaround for inconsistency in eventGrounding
* fix linting
Signed-off-by: bossenti <bo...@posteo.de>
* add missing Apache headers
Signed-off-by: bossenti <bo...@posteo.de>
---------
Signed-off-by: bossenti <bo...@posteo.de>
---
.../streampipes/model/common.py | 3 +-
.../streampipes/model/resource/data_stream.py | 32 ++++++++++
.../tests/client/test_endpoint.py | 2 +-
streampipes-client-python/tests/model/__init__.py | 16 +++++
.../tests/model/resource/__init__.py | 16 +++++
.../tests/model/resource/test_data_stream.py | 73 ++++++++++++++++++++++
6 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py
index c72cba3d8..fb7bcd797 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -47,6 +47,7 @@ class BasicModel(BaseModel):
"""
alias_generator = _snake_to_camel_case
+ allow_population_by_field_name = True
class BaseElement(BasicModel):
@@ -127,7 +128,7 @@ class TransportProtocol(BasicModel):
element_id: Optional[StrictStr]
broker_hostname: StrictStr
topic_definition: TopicDefinition
- port: StrictInt
+ port: StrictInt = Field(alias="kafkaPort")
class TransportFormat(BasicModel):
diff --git a/streampipes-client-python/streampipes/model/resource/data_stream.py b/streampipes-client-python/streampipes/model/resource/data_stream.py
index 073160d02..97d049b2d 100644
--- a/streampipes-client-python/streampipes/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes/model/resource/data_stream.py
@@ -95,3 +95,35 @@ class DataStream(Resource):
uri: Optional[StrictStr]
dom: Optional[StrictStr]
rev: Optional[StrictStr] = Field(alias="_rev")
+
+ def to_dict(self, use_source_names=True):
+ """Returns the resource in dictionary representation.
+
+ Parameters
+ ----------
+ use_source_names: bool
+ Indicates if the dictionary keys are in python representation or
+ equally named to the StreamPipes backend
+
+ Returns
+ ------
+ resource: Dict[str, Any]
+ The resource as dictionary representation
+
+ """
+
+ # This serves as a temporary fix for https://github.com/apache/streampipes/issues/1245
+ # should be removed as soon as possible
+
+ resource_dict = self.dict(by_alias=use_source_names)
+
+ if (
+ use_source_names
+ and (transport_protocol_dict := resource_dict["eventGrounding"]["transportProtocols"][0])["@class"]
+ != "org.apache.streampipes.model.grounding.KafkaTransportProtocol"
+ ):
+ port = transport_protocol_dict.pop("kafkaPort")
+ transport_protocol_dict.update({"port": port})
+ resource_dict["eventGrounding"]["transportProtocols"][0] = transport_protocol_dict
+
+ return resource_dict
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index c0c8d870c..16a2b6d9d 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -212,7 +212,7 @@ class TestStreamPipesEndpoints(TestCase):
any_order=True,
)
self.assertTrue(isinstance(result, DataStream))
- self.assertEqual(result.dict(by_alias=True), self.data_stream_get)
+ self.assertEqual(result.to_dict(use_source_names=True), self.data_stream_get)
@patch("streampipes.client.client.Session", autospec=True)
def test_endpoint_post(self, http_session: MagicMock):
diff --git a/streampipes-client-python/tests/model/__init__.py b/streampipes-client-python/tests/model/__init__.py
new file mode 100644
index 000000000..cce3acad3
--- /dev/null
+++ b/streampipes-client-python/tests/model/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/streampipes-client-python/tests/model/resource/__init__.py b/streampipes-client-python/tests/model/resource/__init__.py
new file mode 100644
index 000000000..cce3acad3
--- /dev/null
+++ b/streampipes-client-python/tests/model/resource/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/streampipes-client-python/tests/model/resource/test_data_stream.py b/streampipes-client-python/tests/model/resource/test_data_stream.py
new file mode 100644
index 000000000..964516a63
--- /dev/null
+++ b/streampipes-client-python/tests/model/resource/test_data_stream.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase
+
+from streampipes.model.resource import DataStream
+
+
+class TestDataStreamWorkaround(TestCase):
+ """
+ Testcase that assures behavior of workaround introduced
+ as a temporary fix for https://github.com/apache/streampipes/issues/1245
+ Needs to be removed as soon as possible
+ """
+
+ def test_nats_case(self):
+ data_stream_def = {
+ "elementId": "some-random-id",
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class": "org.apache.streampipes.model.grounding.NatsTransportProtocol",
+ "brokerHostname": "broker-host-name",
+ "topicDefinition": {
+ "@class": "some-class-name",
+ "actualTopicName": "actual-topic-name"
+ },
+ "port": 50
+ }
+ ]
+ }
+ }
+
+ data_stream = DataStream.parse_obj(data_stream_def)
+
+ self.assertEqual(50, data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["port"])
+
+ def test_kafka_case(self):
+
+ data_stream_def = {
+ "elementId": "some-random-id",
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class": "org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": "broker-host-name",
+ "topicDefinition": {
+ "@class": "some-class-name",
+ "actualTopicName": "actual-topic-name"
+ },
+ "kafkaPort": 50
+ }
+ ]
+ }
+ }
+
+ data_stream = DataStream.parse_obj(data_stream_def)
+
+ self.assertEqual(50, data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["kafkaPort"])