You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/02/20 08:58:41 UTC
[streampipes] branch dev updated: Update data stream generator (#1258) (#1286)
This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b693a2fd Update data stream generator (#1258) (#1286)
7b693a2fd is described below
commit 7b693a2fd2ca804156dce016c7bf1c94645f166a
Author: Sven Oehler <43...@users.noreply.github.com>
AuthorDate: Mon Feb 20 09:58:36 2023 +0100
Update data stream generator (#1258) (#1286)
* Update data stream generator
* Fix data stream name
* Update default values
---
.../streampipes/function_zoo/river_function.py | 2 +-
.../functions/utils/data_stream_generator.py | 114 +++++----------------
.../streampipes/model/common.py | 52 +++++++---
.../streampipes/model/resource/data_stream.py | 36 ++++---
4 files changed, 82 insertions(+), 122 deletions(-)
diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index 2b73b3ceb..ac142c989 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -151,7 +151,7 @@ class OnlineML:
):
self.client = client
- attributes = {"learning": "boolean", "prediction": prediction_type}
+ attributes = {"learning": RuntimeType.BOOLEAN.value, "prediction": prediction_type}
if supervised:
attributes["truth"] = prediction_type
if target_label is None:
diff --git a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
index 86bf8cdb5..e11657e44 100644
--- a/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
+++ b/streampipes-client-python/streampipes/functions/utils/data_stream_generator.py
@@ -15,26 +15,13 @@
# limitations under the License.
#
-import random
-import string
from enum import Enum
from typing import Dict, Optional
-from uuid import uuid4
+from streampipes.model.common import EventProperty, EventSchema
from streampipes.model.resource.data_stream import DataStream
-def random_letters(n: int):
- """Generates n random letters.
-
- Parameters
- ----------
- n: int
- number of letters
- """
- return "".join(random.choice(string.ascii_letters) for _ in range(n))
-
-
class RuntimeType(Enum):
"""Runtime types for the attributes of a data stream."""
@@ -59,79 +46,28 @@ def create_data_stream(name: str, attributes: Dict[str, str], stream_id: Optiona
stream_id: str
The id of this data stream.
"""
- if not stream_id:
- stream_id = f"sp:spdatastream:{random_letters(6)}"
- return DataStream(
- **{
- "@class": "org.apache.streampipes.model.SpDataStream",
- "elementId": stream_id,
- "dom": None,
- "connectedTo": None,
- "name": name,
- "description": "",
- "iconUrl": None,
- "appId": None,
- "includesAssets": False,
- "includesLocales": False,
- "includedAssets": [],
- "includedLocales": [],
- "internallyManaged": False,
- "eventGrounding": {
- "transportProtocols": [
- {
- "@class": "org.apache.streampipes.model.grounding.NatsTransportProtocol",
- "elementId": f"sp:transportprotocol:{random_letters(6)}",
- "brokerHostname": "nats",
- "topicDefinition": {
- "@class": "org.apache.streampipes.model.grounding.SimpleTopicDefinition",
- "actualTopicName": f"org.apache.streampipes.connect.{uuid4()}",
- },
- "port": 4222,
- }
- ],
- "transportFormats": [{"rdfType": ["http://sepa.event-processing.org/sepa#json"]}],
- },
- "eventSchema": {
- "eventProperties": [
- {
- "@class": "org.apache.streampipes.model.schema.EventPropertyPrimitive",
- "elementId": f"sp:eventproperty:{random_letters(6)}",
- "label": "timestamp",
- "description": None,
- "runtimeName": "timestamp",
- "required": False,
- "domainProperties": ["http://schema.org/DateTime"],
- "propertyScope": "HEADER_PROPERTY",
- "index": 0,
- "runtimeId": None,
- "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
- "measurementUnit": None,
- "valueSpecification": None,
- }
- ]
- + [
- {
- "@class": "org.apache.streampipes.model.schema.EventPropertyPrimitive",
- "elementId": f"sp:eventproperty:{random_letters(6)}",
- "label": attribute_name,
- "description": None,
- "runtimeName": attribute_name,
- "required": False,
- "domainProperties": [],
- "propertyScope": "MEASUREMENT_PROPERTY",
- "index": i,
- "runtimeId": None,
- "runtimeType": f"http://www.w3.org/2001/XMLSchema#{attribute_type}",
- "measurementUnit": None,
- "valueSpecification": None,
- }
- for i, (attribute_name, attribute_type) in enumerate(attributes.items(), start=1)
- ]
- },
- "category": None,
- "index": 0,
- "correspondingAdapterId": None,
- "uri": stream_id,
- "_rev": None,
- }
+
+ event_schema = EventSchema(
+ event_properties=[
+ EventProperty(
+ label="timestamp",
+ runtime_name="timestamp",
+ domain_properties=["http://schema.org/DateTime"],
+ property_scope="HEADER_PROPERTY",
+ runtime_type="http://www.w3.org/2001/XMLSchema#long",
+ )
+ ]
+ + [
+ EventProperty(
+ label=attribute_name,
+ runtime_name=attribute_name,
+ index=i,
+ runtime_type=f"http://www.w3.org/2001/XMLSchema#{attribute_type}",
+ )
+ for i, (attribute_name, attribute_type) in enumerate(attributes.items(), start=1)
+ ]
)
+
+ if not stream_id:
+ return DataStream(name=name, event_schema=event_schema)
+ return DataStream(element_id=stream_id, name=name, event_schema=event_schema)
diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py
index fb7bcd797..edc24b955 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -19,7 +19,10 @@
Classes of the StreamPipes data model that are commonly shared.
"""
+import random
+import string
from typing import List, Optional
+from uuid import uuid4
from pydantic import BaseModel, Field, StrictBool, StrictInt, StrictStr
@@ -30,6 +33,17 @@ __all__ = [
]
+def random_letters(n: int):
+ """Generates n random letters.
+
+ Parameters
+ ----------
+ n: int
+ number of letters
+ """
+ return "".join(random.choice(string.ascii_letters) for _ in range(n))
+
+
def _snake_to_camel_case(snake_case_string: str) -> str:
"""Converts a string in snake_case format to camelCase style."""
@@ -73,17 +87,17 @@ class EventProperty(BasicModel):
Data model of an `EventProperty` in compliance to the StreamPipes Backend.
"""
- class_name: Optional[StrictStr] = Field(alias="@class")
- element_id: Optional[StrictStr]
+ class_name: StrictStr = Field(alias="@class", default="org.apache.streampipes.model.schema.EventPropertyPrimitive")
+ element_id: StrictStr = Field(default_factory=lambda: f"sp:eventproperty:{random_letters(6)}")
label: Optional[StrictStr]
description: Optional[StrictStr]
runtime_name: StrictStr
- required: StrictBool
- domain_properties: List[StrictStr]
- property_scope: Optional[StrictStr]
- index: StrictInt
+ required: StrictBool = Field(default=False)
+ domain_properties: List[StrictStr] = Field(default_factory=list)
+ property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY")
+ index: StrictInt = Field(default=0)
runtime_id: Optional[StrictStr]
- runtime_type: Optional[StrictStr]
+ runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string")
measurement_unit: Optional[StrictStr]
value_specification: Optional[ValueSpecification]
@@ -115,8 +129,10 @@ class TopicDefinition(BasicModel):
Data model of a `TopicDefinition` in compliance to the StreamPipes Backend.
"""
- class_name: Optional[StrictStr] = Field(alias="@class")
- actual_topic_name: StrictStr
+ class_name: Optional[StrictStr] = Field(
+ alias="@class", default="org.apache.streampipes.model.grounding.SimpleTopicDefinition"
+ )
+ actual_topic_name: StrictStr = Field(default_factory=lambda: f"org.apache.streampipes.connect.{uuid4()}")
class TransportProtocol(BasicModel):
@@ -124,11 +140,13 @@ class TransportProtocol(BasicModel):
Data model of a `TransportProtocol` in compliance to the StreamPipes Backend.
"""
- class_name: StrictStr = Field(alias="@class")
- element_id: Optional[StrictStr]
- broker_hostname: StrictStr
- topic_definition: TopicDefinition
- port: StrictInt = Field(alias="kafkaPort")
+ class_name: StrictStr = Field(
+ alias="@class", default="org.apache.streampipes.model.grounding.NatsTransportProtocol"
+ )
+ element_id: StrictStr = Field(default_factory=lambda: f"sp:transportprotocol:{random_letters(6)}")
+ broker_hostname: StrictStr = Field(default="nats")
+ topic_definition: TopicDefinition = Field(default_factory=TopicDefinition)
+ port: StrictInt = Field(alias="kafkaPort", default=4222)
class TransportFormat(BasicModel):
@@ -136,7 +154,7 @@ class TransportFormat(BasicModel):
Data model of a `TransportFormat` in compliance to the StreamPipes Backend.
"""
- rdf_type: Optional[List[Optional[StrictStr]]]
+ rdf_type: List[StrictStr] = Field(default=["http://sepa.event-processing.org/sepa#json"])
class EventGrounding(BasicModel):
@@ -144,8 +162,8 @@ class EventGrounding(BasicModel):
Data model of an `EventGrounding` in compliance to the StreamPipes Backend.
"""
- transport_protocols: List[TransportProtocol]
- transport_formats: Optional[List[Optional[TransportFormat]]]
+ transport_protocols: List[TransportProtocol] = Field(default_factory=lambda: [TransportProtocol()])
+ transport_formats: List[TransportFormat] = Field(default_factory=lambda: [TransportFormat()])
class MeasurementCapability(BasicModel):
diff --git a/streampipes-client-python/streampipes/model/resource/data_stream.py b/streampipes-client-python/streampipes/model/resource/data_stream.py
index 97d049b2d..d053eb574 100644
--- a/streampipes-client-python/streampipes/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes/model/resource/data_stream.py
@@ -23,6 +23,7 @@ from streampipes.model.common import (
EventSchema,
MeasurementCapability,
MeasurementObject,
+ random_letters,
)
from streampipes.model.resource.resource import Resource
@@ -72,26 +73,31 @@ class DataStream(Resource):
"num_included_locales": len(self.included_locales) if self.included_locales is not None else 0,
}
- class_name: Optional[StrictStr] = Field(alias="@class")
- element_id: StrictStr
- name: Optional[StrictStr]
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ if not self.uri:
+ self.uri = self.element_id
+
+ class_name: StrictStr = Field(alias="@class", default_factory=lambda: "org.apache.streampipes.model.SpDataStream")
+ element_id: StrictStr = Field(default_factory=lambda: f"sp:spdatastream:{random_letters(6)}")
+ name: StrictStr = Field(default="Unnamed")
description: Optional[StrictStr]
icon_url: Optional[StrictStr]
app_id: Optional[StrictStr]
- includes_assets: Optional[StrictBool]
- includes_locales: Optional[StrictBool]
- included_assets: Optional[List[Optional[StrictStr]]]
- included_locales: Optional[List[Optional[StrictStr]]]
- application_links: Optional[List[Optional[ApplicationLink]]]
- internally_managed: Optional[StrictBool]
- connected_to: Optional[List[Optional[StrictStr]]]
- event_grounding: EventGrounding
+ includes_assets: StrictBool = Field(default=False)
+ includes_locales: StrictBool = Field(default=False)
+ included_assets: List[StrictStr] = Field(default_factory=list)
+ included_locales: List[StrictStr] = Field(default_factory=list)
+ application_links: List[ApplicationLink] = Field(default_factory=list)
+ internally_managed: StrictBool = Field(default=False)
+ connected_to: Optional[List[StrictStr]]
+ event_grounding: EventGrounding = Field(default_factory=EventGrounding)
event_schema: Optional[EventSchema]
- measurement_capability: Optional[List[Optional[MeasurementCapability]]]
- measurement_object: Optional[List[Optional[MeasurementObject]]]
- index: Optional[StrictInt]
+ measurement_capability: Optional[List[MeasurementCapability]]
+ measurement_object: Optional[List[MeasurementObject]]
+ index: StrictInt = Field(default=0)
corresponding_adapter_id: Optional[StrictStr]
- category: Optional[List[Optional[StrictStr]]]
+ category: Optional[List[StrictStr]]
uri: Optional[StrictStr]
dom: Optional[StrictStr]
rev: Optional[StrictStr] = Field(alias="_rev")