You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/12 08:37:26 UTC

[streampipes] 01/02: [#1085] Add getter for event schema to AdapterDescription

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 4981f1208c209a2093c95440c5ccc75e2626eaf7
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Jan 12 09:36:12 2023 +0100

    [#1085] Add getter for event schema to AdapterDescription
---
 .../extensions/api/connect/IAdapterPipeline.java   |  4 +
 .../extensions/api/connect/IProtocol.java          |  3 -
 .../connect/adapter/AdapterPipelineGenerator.java  |  4 +-
 .../adapter/model/generic/GenericAdapter.java      |  5 +-
 .../connect/adapter/model/generic/Protocol.java    | 10 ---
 .../adapter/model/pipeline/AdapterPipeline.java    | 16 ++--
 .../iiot/protocol/stream/FileStreamProtocol.java   |  2 +-
 streampipes-model/pom.xml                          |  5 ++
 .../model/connect/adapter/AdapterDescription.java  |  3 +
 .../connect/adapter/GenericAdapterDescription.java |  4 -
 .../adapter/GenericAdapterSetDescription.java      | 21 +-----
 .../adapter/GenericAdapterStreamDescription.java   | 12 +--
 .../adapter/SpecificAdapterSetDescription.java     |  5 ++
 .../adapter/SpecificAdapterStreamDescription.java  |  5 ++
 .../apache/streampipes/model/util/SchemaUtils.java | 35 +++++++++
 .../connect/adapter/AdapterDescriptionTest.java    |  2 +-
 .../streampipes/model/util/SchemaUtilsTest.java    | 88 ++++++++++++++++++++++
 .../generator/EventPropertyNestedTestBuilder.java  | 62 +++++++++++++++
 ...java => EventPropertyPrimitiveTestBuilder.java} | 29 ++++++-
 ...enerator.java => EventPropertyTestBuilder.java} | 20 ++++-
 .../test/generator/EventSchemaTestBuilder.java     | 41 +++++-----
 21 files changed, 297 insertions(+), 79 deletions(-)

diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
index f3434897c..5739396c4 100644
--- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
+++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.extensions.api.connect;
 
+import org.apache.streampipes.model.schema.EventSchema;
+
 import java.util.List;
 import java.util.Map;
 
@@ -31,4 +33,6 @@ public interface IAdapterPipeline {
   void changePipelineSink(IAdapterPipelineElement pipelineSink);
 
   IAdapterPipelineElement getPipelineSink();
+
+  EventSchema getResultingEventSchema();
 }
diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
index 2d9982536..8b4a69ba0 100644
--- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
+++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
@@ -21,7 +21,6 @@ import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
 
 public interface IProtocol extends Connector {
 
@@ -42,6 +41,4 @@ public interface IProtocol extends Connector {
 
   String getId();
 
-  //TODO remove
-  void setEventSchema(EventSchema eventSchema);
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 60d409a5e..c344466bc 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -71,8 +71,6 @@ public class AdapterPipelineGenerator {
     }
     pipelineElements.add(transformStreamAdapterElement);
 
-    // TODO decide what was meant with this comment
-    // Needed when adapter is (
     if (adapterDescription.getEventGrounding() != null
         && adapterDescription.getEventGrounding().getTransportProtocol() != null
         && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
@@ -84,7 +82,7 @@ public class AdapterPipelineGenerator {
       return new AdapterPipeline(pipelineElements, new DebugAdapterSink());
     }
 
-    return new AdapterPipeline(pipelineElements);
+    return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema());
   }
 
   public List<IAdapterPipelineElement> makeAdapterPipelineElements(List<TransformationRuleDescription> rules) {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
index 68e9591c4..54f16fac4 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
@@ -29,7 +29,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,8 +68,8 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
     this.protocol = protocolInstance;
 
     //TODO remove
-    EventSchema eventSchema = adapterDescription.getEventSchema();
-    this.protocol.setEventSchema(eventSchema);
+//    EventSchema eventSchema = adapterDescription.getEventSchema();
+//    this.protocol.setEventSchema(eventSchema);
 
     logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId());
 
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
index 9b5a7870d..d7d24ca4c 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
@@ -21,16 +21,12 @@ package org.apache.streampipes.extensions.management.connect.adapter.model.gener
 import org.apache.streampipes.extensions.api.connect.IFormat;
 import org.apache.streampipes.extensions.api.connect.IParser;
 import org.apache.streampipes.extensions.api.connect.IProtocol;
-import org.apache.streampipes.model.schema.EventSchema;
 
 public abstract class Protocol implements IProtocol {
 
   protected IParser parser;
   protected IFormat format;
 
-  //TODO remove
-  protected EventSchema eventSchema;
-
   public Protocol() {
 
   }
@@ -39,10 +35,4 @@ public abstract class Protocol implements IProtocol {
     this.parser = parser;
     this.format = format;
   }
-
-  //TODO remove
-  @Override
-  public void setEventSchema(EventSchema eventSchema) {
-    this.eventSchema = eventSchema;
-  }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index a055c4f3a..d1a0bc1c6 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.extensions.management.connect.adapter.model.pipel
 
 import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -29,9 +30,11 @@ public class AdapterPipeline implements IAdapterPipeline {
   private List<IAdapterPipelineElement> pipelineElements;
   private IAdapterPipelineElement pipelineSink;
 
+  private EventSchema resultingEventSchema;
 
-  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements) {
+  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) {
     this.pipelineElements = pipelineElements;
+    this.resultingEventSchema = resultingEventSchema;
   }
 
   public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
@@ -42,12 +45,6 @@ public class AdapterPipeline implements IAdapterPipeline {
   @Override
   public void process(Map<String, Object> event) {
 
-    // TODO remove, just for performance tests
-    if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
-      event.put("internal_t1", System.currentTimeMillis());
-    }
-
-
     for (IAdapterPipelineElement pipelineElement : pipelineElements) {
       event = pipelineElement.process(event);
     }
@@ -76,4 +73,9 @@ public class AdapterPipeline implements IAdapterPipeline {
   public IAdapterPipelineElement getPipelineSink() {
     return pipelineSink;
   }
+
+  @Override
+  public EventSchema getResultingEventSchema() {
+    return resultingEventSchema;
+  }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index d47a0f4b4..974265bc0 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -85,7 +85,7 @@ public class FileStreamProtocol extends Protocol {
 
   @Override
   public void run(IAdapterPipeline adapterPipeline) {
-    String timestampKey = getTimestampKey(eventSchema.getEventProperties(), "");
+    String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), "");
 
     // exchange adapter pipeline sink with special purpose replay sink for file replay
     if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml
index 40bb1ca79..994507656 100644
--- a/streampipes-model/pom.xml
+++ b/streampipes-model/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>streampipes-model-shared</artifactId>
             <version>0.91.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-test-utils</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-logging</artifactId>
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index efbcb7e1c..c685b709e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.util.Cloner;
@@ -252,4 +253,6 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
   public void setCorrespondingDataStreamElementId(String correspondingDataStreamElementId) {
     this.correspondingDataStreamElementId = correspondingDataStreamElementId;
   }
+
+  public abstract EventSchema getEventSchema();
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
index 7468b79fa..f7e2eb8a8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.model.connect.adapter;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
-import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.List;
 
@@ -32,8 +31,5 @@ public interface GenericAdapterDescription {
   FormatDescription getFormatDescription();
 
   List<TransformationRuleDescription> getRules();
-
-  EventSchema getEventSchema();
-
 }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
index 0779f7ae1..45e292bbf 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
@@ -30,8 +30,6 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
 
   public static final String ID = ElementIdGenerator.makeFixedElementId(GenericAdapterSetDescription.class);
 
-//    private String sourceType = "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription";
-
   private FormatDescription formatDescription;
 
   private ProtocolDescription protocolDescription;
@@ -66,14 +64,6 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
     this.formatDescription = formatDescription;
   }
 
-  @Override
-  public EventSchema getEventSchema() {
-    if (this.getDataSet() != null) {
-      return this.getDataSet().getEventSchema();
-    }
-    return null;
-  }
-
   public ProtocolDescription getProtocolDescription() {
     return protocolDescription;
   }
@@ -82,11 +72,8 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
     this.protocolDescription = protocolDescription;
   }
 
-//    public String getSourceType() {
-//        return sourceType;
-//    }
-//
-//    public void setSourceType(String sourceType) {
-//        this.sourceType = sourceType;
-//    }
+  public EventSchema getEventSchema() {
+    return this.getDataSet().getEventSchema();
+  }
+
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
index 67a88aa87..b25cf6b5d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
@@ -63,14 +63,6 @@ public class GenericAdapterStreamDescription extends AdapterStreamDescription im
     this.formatDescription = formatDescription;
   }
 
-  @Override
-  public EventSchema getEventSchema() {
-    if (this.getDataStream() != null) {
-      return this.getDataStream().getEventSchema();
-    }
-    return null;
-  }
-
   public ProtocolDescription getProtocolDescription() {
     return protocolDescription;
   }
@@ -78,5 +70,7 @@ public class GenericAdapterStreamDescription extends AdapterStreamDescription im
   public void setProtocolDescription(ProtocolDescription protocolDescription) {
     this.protocolDescription = protocolDescription;
   }
-
+  public EventSchema getEventSchema() {
+    return this.getDataStream().getEventSchema();
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
index 6052a85a3..d48aa3c86 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.model.connect.adapter;
 
+import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 
 @TsModel
@@ -29,4 +30,8 @@ public class SpecificAdapterSetDescription extends AdapterSetDescription {
   public SpecificAdapterSetDescription(AdapterSetDescription other) {
     super(other);
   }
+
+  public EventSchema getEventSchema() {
+    return this.getDataSet().getEventSchema();
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
index d5f2aa50f..13759dbd2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.model.connect.adapter;
 
+import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 
 @TsModel
@@ -29,4 +30,8 @@ public class SpecificAdapterStreamDescription extends AdapterStreamDescription {
   public SpecificAdapterStreamDescription(AdapterStreamDescription other) {
     super(other);
   }
+
+  public EventSchema getEventSchema() {
+    return this.getDataStream().getEventSchema();
+  }
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
index 655037aea..ba8a15587 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
@@ -19,11 +19,17 @@
 package org.apache.streampipes.model.util;
 
 import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.vocabulary.SO;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class SchemaUtils {
 
@@ -48,4 +54,33 @@ public class SchemaUtils {
     }
     return properties;
   }
+
+  /**
+   * Returns the timestamp property of an event schema as an {@code Optional}.
+   *
+   * <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain
+   * properties contains the uri http://schema.org/DateTime </p>
+   *
+   * @param eventSchema the event schema for which the timestamp property is to be returned
+   * @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if
+   * no such property was found
+   */
+  public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) {
+    return getTimstampProperty(eventSchema.getEventProperties());
+  }
+
+
+  private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) {
+    for (EventProperty ep : eventProperties) {
+      if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) {
+        return Optional.of((EventPropertyPrimitive) ep);
+      }
+
+      if (ep instanceof EventPropertyNested) {
+        return getTimstampProperty(((EventPropertyNested) ep).getEventProperties());
+      }
+    }
+
+    return Optional.empty();
+  }
 }
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java b/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
index dd8ce6e08..bd529a9ac 100644
--- a/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
+++ b/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
@@ -37,7 +37,7 @@ public class AdapterDescriptionTest {
 
   @Before
   public void init() {
-    adapterDescription = new AdapterDescription() {
+    adapterDescription = new SpecificAdapterStreamDescription() {
     };
 
     List rules = new ArrayList<>();
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java b/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
new file mode 100644
index 000000000..4dd3bb5bc
--- /dev/null
+++ b/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.util;
+
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.test.generator.EventPropertyNestedTestBuilder;
+import org.apache.streampipes.test.generator.EventPropertyPrimitiveTestBuilder;
+import org.apache.streampipes.test.generator.EventSchemaTestBuilder;
+import org.apache.streampipes.vocabulary.SO;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class SchemaUtilsTest {
+
+  EventProperty timestampProperty = EventPropertyPrimitiveTestBuilder.create()
+      .withSemanticType(SO.DATE_TIME)
+      .withRuntimeName("timestamp")
+      .build();
+
+  @Test
+  public void noTimestampPropery() {
+
+    var eventSchema = EventSchemaTestBuilder.create()
+        .withEventProperty(
+            EventPropertyPrimitiveTestBuilder.create().build())
+        .build();
+
+    var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+    assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void getTimestampProperty() {
+    var eventSchema = EventSchemaTestBuilder.create()
+        .withEventProperty(timestampProperty)
+        .build();
+
+//    var timestampProperty = new EventPropertyPrimitive();
+//    timestampProperty.setDomainProperties(List.of(URI.create(SO.DATE_TIME)));
+//    timestampProperty.setRuntimeName("timestamp");
+//    eventSchema.addEventProperty(timestampProperty);
+
+    var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+    assertTrue(result.isPresent());
+    assertEquals(result.get(), timestampProperty);
+  }
+
+  @Test
+  public void getNestedTimestampProperty() {
+
+    var eventSchema = EventSchemaTestBuilder.create()
+        .withEventProperty(
+            EventPropertyNestedTestBuilder.create()
+                .withEventProperty(timestampProperty)
+                .build())
+        .build();
+
+    var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+    assertTrue(result.isPresent());
+    assertEquals(result.get(), timestampProperty);
+  }
+
+
+}
\ No newline at end of file
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java
new file mode 100644
index 000000000..aecb3b24c
--- /dev/null
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.generator;
+
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventPropertyNestedTestBuilder
+    extends EventPropertyTestBuilder<EventPropertyNested, EventPropertyNestedTestBuilder> {
+
+  private List<EventProperty> eventProperties;
+
+  protected EventPropertyNestedTestBuilder() {
+    super(new EventPropertyNested());
+    eventProperties = new ArrayList<>();
+  }
+
+  public static EventPropertyNestedTestBuilder create() {
+    return new EventPropertyNestedTestBuilder();
+  }
+
+  public EventPropertyNestedTestBuilder withEventProperties(List<EventProperty> eventProperties) {
+    this.eventProperties.addAll(eventProperties);
+    return this;
+  }
+
+  public EventPropertyNestedTestBuilder withEventProperty(EventProperty eventProperty) {
+    this.eventProperties.add(eventProperty);
+    return this;
+  }
+
+  @Override
+  protected EventPropertyNestedTestBuilder me() {
+    return this;
+  }
+
+  @Override
+  public EventPropertyNested build() {
+    eventProperty.setEventProperties(eventProperties);
+    return eventProperty;
+  }
+
+}
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
similarity index 51%
rename from streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java
rename to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
index c3c4d33a7..8fd69330d 100644
--- a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
@@ -17,6 +17,33 @@
  */
 package org.apache.streampipes.test.generator;
 
-public class EventPropertyGenerator {
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 
+import java.net.URI;
+import java.util.List;
+
+public class EventPropertyPrimitiveTestBuilder
+    extends EventPropertyTestBuilder<EventPropertyPrimitive, EventPropertyPrimitiveTestBuilder> {
+
+  protected EventPropertyPrimitiveTestBuilder() {
+    super(new EventPropertyPrimitive());
+  }
+
+  public EventPropertyPrimitiveTestBuilder withSemanticType(String semanticType) {
+    this.eventProperty.setDomainProperties(List.of(URI.create(semanticType)));
+    return this;
+  }
+
+  public static EventPropertyPrimitiveTestBuilder create() {
+    return new EventPropertyPrimitiveTestBuilder();
+  }
+
+  @Override
+  public EventPropertyPrimitive build() {
+    return eventProperty;
+  }
+
+  protected EventPropertyPrimitiveTestBuilder me() {
+    return this;
+  }
 }
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
similarity index 63%
rename from streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java
rename to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
index 90e96e584..6bdb52aab 100644
--- a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
@@ -15,8 +15,26 @@
  * limitations under the License.
  *
  */
+
 package org.apache.streampipes.test.generator;
 
-public class EventSchemaGenerator {
+import org.apache.streampipes.model.schema.EventProperty;
+
+public abstract  class EventPropertyTestBuilder<T extends EventProperty, K extends EventPropertyTestBuilder> {
+
+  protected T eventProperty;
+
+  protected abstract K me();
+
+  protected EventPropertyTestBuilder(T eventProperty) {
+    this.eventProperty = eventProperty;
+  }
+
+  public K withRuntimeName(String runtimeName) {
+    this.eventProperty.setRuntimeName(runtimeName);
+    return me();
+  }
+
+  public abstract T build();
 
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
similarity index 53%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
copy to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
index 655037aea..234505ff8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
@@ -15,37 +15,40 @@
  * limitations under the License.
  *
  */
-
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.test.generator;
 
 import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-public class SchemaUtils {
+public class EventSchemaTestBuilder {
+
+  private List<EventProperty> eventProperties;
 
-  public static Map<String, Object> toRuntimeMap(List<EventProperty> eps) {
-    return toUntypedRuntimeMap(eps);
+  private EventSchemaTestBuilder() {
+    eventProperties = new ArrayList<>();
   }
 
-  public static Map<String, Object> toUntypedRuntimeMap(List<EventProperty> eps) {
-    Map<String, Object> propertyMap = new HashMap<>();
+  public static EventSchemaTestBuilder create() {
+    return new EventSchemaTestBuilder();
+  }
 
-    for (EventProperty p : eps) {
-      propertyMap.putAll(PropertyUtils.getUntypedRuntimeFormat(p));
-    }
-    return propertyMap;
+  public EventSchemaTestBuilder withEventProperties(List<EventProperty> eventProperties) {
+    this.eventProperties.addAll(eventProperties);
+    return this;
   }
 
-  public static List<String> toPropertyList(List<EventProperty> eps) {
-    List<String> properties = new ArrayList<>();
+  public EventSchemaTestBuilder withEventProperty(EventProperty eventProperty) {
+    eventProperties.add(eventProperty);
+    return this;
+  }
 
-    for (EventProperty p : eps) {
-      properties.addAll(PropertyUtils.getFullPropertyName(p, ""));
-    }
-    return properties;
+  public EventSchema build() {
+    EventSchema eventSchema = new EventSchema();
+    eventSchema.setEventProperties(eventProperties);
+    return eventSchema;
   }
+
 }