You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/02/20 08:58:41 UTC

[streampipes] branch dev updated: Update data stream generator (#1258) (#1286)

This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b693a2fd Update data stream generator (#1258) (#1286)
7b693a2fd is described below

commit 7b693a2fd2ca804156dce016c7bf1c94645f166a
Author: Sven Oehler <43...@users.noreply.github.com>
AuthorDate: Mon Feb 20 09:58:36 2023 +0100

    Update data stream generator (#1258) (#1286)
    
    * Update data stream generator
    
    * Fix data stream name
    
    * Update default values
---
 .../streampipes/function_zoo/river_function.py     |   2 +-
 .../functions/utils/data_stream_generator.py       | 114 +++++----------------
 .../streampipes/model/common.py                    |  52 +++++++---
 .../streampipes/model/resource/data_stream.py      |  36 ++++---
 4 files changed, 82 insertions(+), 122 deletions(-)

diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index 2b73b3ceb..ac142c989 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -151,7 +151,7 @@ class OnlineML:
     ):
         self.client = client
 
-        attributes = {"learning": "boolean", "prediction": prediction_type}
+        attributes = {"learning": RuntimeType.BOOLEAN.value, "prediction": prediction_type}
         if supervised:
             attributes["truth"] = prediction_type
             if target_label is None:
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index 86bf8cdb5..e11657e44 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -15,26 +15,13 @@
 # limitations under the License.
 #
 
-import random
-import string
 from enum import Enum
 from typing import Dict, Optional
-from uuid import uuid4
 
+from streampipes.model.common import EventProperty, EventSchema
 from streampipes.model.resource.data_stream import DataStream
 
 
-def random_letters(n: int):
-    """Generates n random letters.
-
-    Parameters
-    ----------
-    n: int
-        number of letters
-    """
-    return "".join(random.choice(string.ascii_letters) for _ in range(n))
-
-
 class RuntimeType(Enum):
     """Runtime types for the attributes of a data stream."""
 
@@ -59,79 +46,28 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
     stream_id: str
         The id of this data stream.
     """
-    if not stream_id:
-        stream_id = f"sp:spdatastream:{random_letters(6)}"
-    return DataStream(
-        **{
-            "@class": "org.apache.streampipes.model.SpDataStream",
-            "elementId": stream_id,
-            "dom": None,
-            "connectedTo": None,
-            "name": name,
-            "description": "",
-            "iconUrl": None,
-            "appId": None,
-            "includesAssets": False,
-            "includesLocales": False,
-            "includedAssets": [],
-            "includedLocales": [],
-            "internallyManaged": False,
-            "eventGrounding": {
-                "transportProtocols": [
-                    {
-                        "@class": "org.apache.streampipes.model.grounding.NatsTransportProtocol",
-                        "elementId": f"sp:transportprotocol:{random_letters(6)}",
-                        "brokerHostname": "nats",
-                        "topicDefinition": {
-                            "@class": "org.apache.streampipes.model.grounding.SimpleTopicDefinition",
-                            "actualTopicName": f"org.apache.streampipes.connect.{uuid4()}",
-                        },
-                        "port": 4222,
-                    }
-                ],
-                "transportFormats": [{"rdfType": ["http://sepa.event-processing.org/sepa#json"]}],
-            },
-            "eventSchema": {
-                "eventProperties": [
-                    {
-                        "@class": "org.apache.streampipes.model.schema.EventPropertyPrimitive",
-                        "elementId": f"sp:eventproperty:{random_letters(6)}",
-                        "label": "timestamp",
-                        "description": None,
-                        "runtimeName": "timestamp",
-                        "required": False,
-                        "domainProperties": ["http://schema.org/DateTime"],
-                        "propertyScope": "HEADER_PROPERTY",
-                        "index": 0,
-                        "runtimeId": None,
-                        "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
-                        "measurementUnit": None,
-                        "valueSpecification": None,
-                    }
-                ]
-                + [
-                    {
-                        "@class": "org.apache.streampipes.model.schema.EventPropertyPrimitive",
-                        "elementId": f"sp:eventproperty:{random_letters(6)}",
-                        "label": attribute_name,
-                        "description": None,
-                        "runtimeName": attribute_name,
-                        "required": False,
-                        "domainProperties": [],
-                        "propertyScope": "MEASUREMENT_PROPERTY",
-                        "index": i,
-                        "runtimeId": None,
-                        "runtimeType": f"http://www.w3.org/2001/XMLSchema#{attribute_type}",
-                        "measurementUnit": None,
-                        "valueSpecification": None,
-                    }
-                    for i, (attribute_name, attribute_type) in enumerate(attributes.items(), start=1)
-                ]
-            },
-            "category": None,
-            "index": 0,
-            "correspondingAdapterId": None,
-            "uri": stream_id,
-            "_rev": None,
-        }
+
+    event_schema = EventSchema(
+        event_properties=[
+            EventProperty(
+                label="timestamp",
+                runtime_name="timestamp",
+                domain_properties=["http://schema.org/DateTime"],
+                property_scope="HEADER_PROPERTY",
+                runtime_type="http://www.w3.org/2001/XMLSchema#long",
+            )
+        ]
+        + [
+            EventProperty(
+                label=attribute_name,
+                runtime_name=attribute_name,
+                index=i,
+                runtime_type=f"http://www.w3.org/2001/XMLSchema#{attribute_type}",
+            )
+            for i, (attribute_name, attribute_type) in enumerate(attributes.items(), start=1)
+        ]
     )
+
+    if not stream_id:
+        return DataStream(name=name, event_schema=event_schema)
+    return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py
index fb7bcd797..edc24b955 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -19,7 +19,10 @@
 Classes of the StreamPipes data model that are commonly shared.
 """
 
+import random
+import string
 from typing import List, Optional
+from uuid import uuid4
 
 from pydantic import BaseModel, Field, StrictBool, StrictInt, StrictStr
 
@@ -30,6 +33,17 @@ __all__ = [
 ]
 
 
+def random_letters(n: int):
+    """Generates n random letters.
+
+    Parameters
+    ----------
+    n: int
+        number of letters
+    """
+    return "".join(random.choice(string.ascii_letters) for _ in range(n))
+
+
 def _snake_to_camel_case(snake_case_string: str) -> str:
     """Converts a string in snake_case format to camelCase style."""
 
@@ -73,17 +87,17 @@ class EventProperty(BasicModel):
     Data model of an `EventProperty` in compliance to the StreamPipes Backend.
     """
 
-    class_name: Optional[StrictStr] = Field(alias="@class")
-    element_id: Optional[StrictStr]
+    class_name: StrictStr = Field(alias="@class", default="org.apache.streampipes.model.schema.EventPropertyPrimitive")
+    element_id: StrictStr = Field(default_factory=lambda: f"sp:eventproperty:{random_letters(6)}")
     label: Optional[StrictStr]
     description: Optional[StrictStr]
     runtime_name: StrictStr
-    required: StrictBool
-    domain_properties: List[StrictStr]
-    property_scope: Optional[StrictStr]
-    index: StrictInt
+    required: StrictBool = Field(default=False)
+    domain_properties: List[StrictStr] = Field(default_factory=list)
+    property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
+    index: StrictInt = Field(default=0)
     runtime_id: Optional[StrictStr]
-    runtime_type: Optional[StrictStr]
+    runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
     measurement_unit: Optional[StrictStr]
     value_specification: Optional[ValueSpecification]
 
@@ -115,8 +129,10 @@ class TopicDefinition(BasicModel):
     Data model of a `TopicDefinition` in compliance to the StreamPipes Backend.
     """
 
-    class_name: Optional[StrictStr] = Field(alias="@class")
-    actual_topic_name: StrictStr
+    class_name: Optional[StrictStr] = Field(
+        alias="@class", default="org.apache.streampipes.model.grounding.SimpleTopicDefinition"
+    )
+    actual_topic_name: StrictStr = Field(default_factory=lambda: f"org.apache.streampipes.connect.{uuid4()}")
 
 
 class TransportProtocol(BasicModel):
@@ -124,11 +140,13 @@ class TransportProtocol(BasicModel):
     Data model of a `TransportProtocol` in compliance to the StreamPipes Backend.
     """
 
-    class_name: StrictStr = Field(alias="@class")
-    element_id: Optional[StrictStr]
-    broker_hostname: StrictStr
-    topic_definition: TopicDefinition
-    port: StrictInt = Field(alias="kafkaPort")
+    class_name: StrictStr = Field(
+        alias="@class", default="org.apache.streampipes.model.grounding.NatsTransportProtocol"
+    )
+    element_id: StrictStr = Field(default_factory=lambda: f"sp:transportprotocol:{random_letters(6)}")
+    broker_hostname: StrictStr = Field(default="nats")
+    topic_definition: TopicDefinition = Field(default_factory=TopicDefinition)
+    port: StrictInt = Field(alias="kafkaPort", default=4222)
 
 
 class TransportFormat(BasicModel):
@@ -136,7 +154,7 @@ class TransportFormat(BasicModel):
     Data model of a `TransportFormat` in compliance to the StreamPipes Backend.
     """
 
-    rdf_type: Optional[List[Optional[StrictStr]]]
+    rdf_type: List[StrictStr] = Field(default=["http://sepa.event-processing.org/sepa#json"])
 
 
 class EventGrounding(BasicModel):
@@ -144,8 +162,8 @@ class EventGrounding(BasicModel):
     Data model of an `EventGrounding` in compliance to the StreamPipes Backend.
     """
 
-    transport_protocols: List[TransportProtocol]
-    transport_formats: Optional[List[Optional[TransportFormat]]]
+    transport_protocols: List[TransportProtocol] = Field(default_factory=lambda: [TransportProtocol()])
+    transport_formats: List[TransportFormat] = Field(default_factory=lambda: [TransportFormat()])
 
 
 class MeasurementCapability(BasicModel):
diff --git a/streampipes-client-python/streampipes/model/resource/data_stream.py b/streampipes-client-python/streampipes/model/resource/data_stream.py
index 97d049b2d..d053eb574 100644
--- a/streampipes-client-python/streampipes/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes/model/resource/data_stream.py
@@ -23,6 +23,7 @@ from streampipes.model.common import (
     EventSchema,
     MeasurementCapability,
     MeasurementObject,
+    random_letters,
 )
 from streampipes.model.resource.resource import Resource
 
@@ -72,26 +73,31 @@ class DataStream(Resource):
             "num_included_locales": len(self.included_locales) if self.included_locales is not None else 0,
         }
 
-    class_name: Optional[StrictStr] = Field(alias="@class")
-    element_id: StrictStr
-    name: Optional[StrictStr]
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        if not self.uri:
+            self.uri = self.element_id
+
+    class_name: StrictStr = Field(alias="@class", default_factory=lambda: "org.apache.streampipes.model.SpDataStream")
+    element_id: StrictStr = Field(default_factory=lambda: f"sp:spdatastream:{random_letters(6)}")
+    name: StrictStr = Field(default="Unnamed")
     description: Optional[StrictStr]
     icon_url: Optional[StrictStr]
     app_id: Optional[StrictStr]
-    includes_assets: Optional[StrictBool]
-    includes_locales: Optional[StrictBool]
-    included_assets: Optional[List[Optional[StrictStr]]]
-    included_locales: Optional[List[Optional[StrictStr]]]
-    application_links: Optional[List[Optional[ApplicationLink]]]
-    internally_managed: Optional[StrictBool]
-    connected_to: Optional[List[Optional[StrictStr]]]
-    event_grounding: EventGrounding
+    includes_assets: StrictBool = Field(default=False)
+    includes_locales: StrictBool = Field(default=False)
+    included_assets: List[StrictStr] = Field(default_factory=list)
+    included_locales: List[StrictStr] = Field(default_factory=list)
+    application_links: List[ApplicationLink] = Field(default_factory=list)
+    internally_managed: StrictBool = Field(default=False)
+    connected_to: Optional[List[StrictStr]]
+    event_grounding: EventGrounding = Field(default_factory=EventGrounding)
     event_schema: Optional[EventSchema]
-    measurement_capability: Optional[List[Optional[MeasurementCapability]]]
-    measurement_object: Optional[List[Optional[MeasurementObject]]]
-    index: Optional[StrictInt]
+    measurement_capability: Optional[List[MeasurementCapability]]
+    measurement_object: Optional[List[MeasurementObject]]
+    index: StrictInt = Field(default=0)
     corresponding_adapter_id: Optional[StrictStr]
-    category: Optional[List[Optional[StrictStr]]]
+    category: Optional[List[StrictStr]]
     uri: Optional[StrictStr]
     dom: Optional[StrictStr]
     rev: Optional[StrictStr] = Field(alias="_rev")