You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/12 08:37:26 UTC
[streampipes] 01/02: [#1085] Add getter for event schema to AdapterDescription
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 4981f1208c209a2093c95440c5ccc75e2626eaf7
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Jan 12 09:36:12 2023 +0100
[#1085] Add getter for event schema to AdapterDescription
---
.../extensions/api/connect/IAdapterPipeline.java | 4 +
.../extensions/api/connect/IProtocol.java | 3 -
.../connect/adapter/AdapterPipelineGenerator.java | 4 +-
.../adapter/model/generic/GenericAdapter.java | 5 +-
.../connect/adapter/model/generic/Protocol.java | 10 ---
.../adapter/model/pipeline/AdapterPipeline.java | 16 ++--
.../iiot/protocol/stream/FileStreamProtocol.java | 2 +-
streampipes-model/pom.xml | 5 ++
.../model/connect/adapter/AdapterDescription.java | 3 +
.../connect/adapter/GenericAdapterDescription.java | 4 -
.../adapter/GenericAdapterSetDescription.java | 21 +-----
.../adapter/GenericAdapterStreamDescription.java | 12 +--
.../adapter/SpecificAdapterSetDescription.java | 5 ++
.../adapter/SpecificAdapterStreamDescription.java | 5 ++
.../apache/streampipes/model/util/SchemaUtils.java | 35 +++++++++
.../connect/adapter/AdapterDescriptionTest.java | 2 +-
.../streampipes/model/util/SchemaUtilsTest.java | 88 ++++++++++++++++++++++
.../generator/EventPropertyNestedTestBuilder.java | 62 +++++++++++++++
...java => EventPropertyPrimitiveTestBuilder.java} | 29 ++++++-
...enerator.java => EventPropertyTestBuilder.java} | 20 ++++-
.../test/generator/EventSchemaTestBuilder.java | 41 +++++-----
21 files changed, 297 insertions(+), 79 deletions(-)
diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
index f3434897c..5739396c4 100644
--- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
+++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IAdapterPipeline.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.extensions.api.connect;
+import org.apache.streampipes.model.schema.EventSchema;
+
import java.util.List;
import java.util.Map;
@@ -31,4 +33,6 @@ public interface IAdapterPipeline {
void changePipelineSink(IAdapterPipelineElement pipelineSink);
IAdapterPipelineElement getPipelineSink();
+
+ EventSchema getResultingEventSchema();
}
diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
index 2d9982536..8b4a69ba0 100644
--- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
+++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IProtocol.java
@@ -21,7 +21,6 @@ import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
public interface IProtocol extends Connector {
@@ -42,6 +41,4 @@ public interface IProtocol extends Connector {
String getId();
- //TODO remove
- void setEventSchema(EventSchema eventSchema);
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 60d409a5e..c344466bc 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -71,8 +71,6 @@ public class AdapterPipelineGenerator {
}
pipelineElements.add(transformStreamAdapterElement);
- // TODO decide what was meant with this comment
- // Needed when adapter is (
if (adapterDescription.getEventGrounding() != null
&& adapterDescription.getEventGrounding().getTransportProtocol() != null
&& adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
@@ -84,7 +82,7 @@ public class AdapterPipelineGenerator {
return new AdapterPipeline(pipelineElements, new DebugAdapterSink());
}
- return new AdapterPipeline(pipelineElements);
+ return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema());
}
public List<IAdapterPipelineElement> makeAdapterPipelineElements(List<TransformationRuleDescription> rules) {
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
index 68e9591c4..54f16fac4 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
@@ -29,7 +29,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,8 +68,8 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
this.protocol = protocolInstance;
//TODO remove
- EventSchema eventSchema = adapterDescription.getEventSchema();
- this.protocol.setEventSchema(eventSchema);
+// EventSchema eventSchema = adapterDescription.getEventSchema();
+// this.protocol.setEventSchema(eventSchema);
logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId());
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
index 9b5a7870d..d7d24ca4c 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/Protocol.java
@@ -21,16 +21,12 @@ package org.apache.streampipes.extensions.management.connect.adapter.model.gener
import org.apache.streampipes.extensions.api.connect.IFormat;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.extensions.api.connect.IProtocol;
-import org.apache.streampipes.model.schema.EventSchema;
public abstract class Protocol implements IProtocol {
protected IParser parser;
protected IFormat format;
- //TODO remove
- protected EventSchema eventSchema;
-
public Protocol() {
}
@@ -39,10 +35,4 @@ public abstract class Protocol implements IProtocol {
this.parser = parser;
this.format = format;
}
-
- //TODO remove
- @Override
- public void setEventSchema(EventSchema eventSchema) {
- this.eventSchema = eventSchema;
- }
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index a055c4f3a..d1a0bc1c6 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.extensions.management.connect.adapter.model.pipel
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.schema.EventSchema;
import java.util.List;
import java.util.Map;
@@ -29,9 +30,11 @@ public class AdapterPipeline implements IAdapterPipeline {
private List<IAdapterPipelineElement> pipelineElements;
private IAdapterPipelineElement pipelineSink;
+ private EventSchema resultingEventSchema;
- public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements) {
+ public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
+ this.resultingEventSchema = resultingEventSchema;
}
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
@@ -42,12 +45,6 @@ public class AdapterPipeline implements IAdapterPipeline {
@Override
public void process(Map<String, Object> event) {
- // TODO remove, just for performance tests
- if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
- event.put("internal_t1", System.currentTimeMillis());
- }
-
-
for (IAdapterPipelineElement pipelineElement : pipelineElements) {
event = pipelineElement.process(event);
}
@@ -76,4 +73,9 @@ public class AdapterPipeline implements IAdapterPipeline {
public IAdapterPipelineElement getPipelineSink() {
return pipelineSink;
}
+
+ @Override
+ public EventSchema getResultingEventSchema() {
+ return resultingEventSchema;
+ }
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index d47a0f4b4..974265bc0 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -85,7 +85,7 @@ public class FileStreamProtocol extends Protocol {
@Override
public void run(IAdapterPipeline adapterPipeline) {
- String timestampKey = getTimestampKey(eventSchema.getEventProperties(), "");
+ String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), "");
// exchange adapter pipeline sink with special purpose replay sink for file replay
if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml
index 40bb1ca79..994507656 100644
--- a/streampipes-model/pom.xml
+++ b/streampipes-model/pom.xml
@@ -38,6 +38,11 @@
<artifactId>streampipes-model-shared</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-test-utils</artifactId>
+ <version>0.91.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-logging</artifactId>
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
index efbcb7e1c..c685b709e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.util.Cloner;
@@ -252,4 +253,6 @@ public abstract class AdapterDescription extends NamedStreamPipesEntity {
public void setCorrespondingDataStreamElementId(String correspondingDataStreamElementId) {
this.correspondingDataStreamElementId = correspondingDataStreamElementId;
}
+
+ public abstract EventSchema getEventSchema();
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
index 7468b79fa..f7e2eb8a8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterDescription.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.model.connect.adapter;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
-import org.apache.streampipes.model.schema.EventSchema;
import java.util.List;
@@ -32,8 +31,5 @@ public interface GenericAdapterDescription {
FormatDescription getFormatDescription();
List<TransformationRuleDescription> getRules();
-
- EventSchema getEventSchema();
-
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
index 0779f7ae1..45e292bbf 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterSetDescription.java
@@ -30,8 +30,6 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
public static final String ID = ElementIdGenerator.makeFixedElementId(GenericAdapterSetDescription.class);
-// private String sourceType = "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription";
-
private FormatDescription formatDescription;
private ProtocolDescription protocolDescription;
@@ -66,14 +64,6 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
this.formatDescription = formatDescription;
}
- @Override
- public EventSchema getEventSchema() {
- if (this.getDataSet() != null) {
- return this.getDataSet().getEventSchema();
- }
- return null;
- }
-
public ProtocolDescription getProtocolDescription() {
return protocolDescription;
}
@@ -82,11 +72,8 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen
this.protocolDescription = protocolDescription;
}
-// public String getSourceType() {
-// return sourceType;
-// }
-//
-// public void setSourceType(String sourceType) {
-// this.sourceType = sourceType;
-// }
+ public EventSchema getEventSchema() {
+ return this.getDataSet().getEventSchema();
+ }
+
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
index 67a88aa87..b25cf6b5d 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/GenericAdapterStreamDescription.java
@@ -63,14 +63,6 @@ public class GenericAdapterStreamDescription extends AdapterStreamDescription im
this.formatDescription = formatDescription;
}
- @Override
- public EventSchema getEventSchema() {
- if (this.getDataStream() != null) {
- return this.getDataStream().getEventSchema();
- }
- return null;
- }
-
public ProtocolDescription getProtocolDescription() {
return protocolDescription;
}
@@ -78,5 +70,7 @@ public class GenericAdapterStreamDescription extends AdapterStreamDescription im
public void setProtocolDescription(ProtocolDescription protocolDescription) {
this.protocolDescription = protocolDescription;
}
-
+ public EventSchema getEventSchema() {
+ return this.getDataStream().getEventSchema();
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
index 6052a85a3..d48aa3c86 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterSetDescription.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.model.connect.adapter;
+import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;
@TsModel
@@ -29,4 +30,8 @@ public class SpecificAdapterSetDescription extends AdapterSetDescription {
public SpecificAdapterSetDescription(AdapterSetDescription other) {
super(other);
}
+
+ public EventSchema getEventSchema() {
+ return this.getDataSet().getEventSchema();
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
index d5f2aa50f..13759dbd2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/SpecificAdapterStreamDescription.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.model.connect.adapter;
+import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;
@TsModel
@@ -29,4 +30,8 @@ public class SpecificAdapterStreamDescription extends AdapterStreamDescription {
public SpecificAdapterStreamDescription(AdapterStreamDescription other) {
super(other);
}
+
+ public EventSchema getEventSchema() {
+ return this.getDataStream().getEventSchema();
+ }
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
index 655037aea..ba8a15587 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
@@ -19,11 +19,17 @@
package org.apache.streampipes.model.util;
import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.vocabulary.SO;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class SchemaUtils {
@@ -48,4 +54,33 @@ public class SchemaUtils {
}
return properties;
}
+
+ /**
+ * Returns the timestamp property of an event schema as an {@code Optional}.
+ *
+ * <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain
+ * properties contains the uri http://schema.org/DateTime </p>
+ *
+ * @param eventSchema the event schema for which the timestamp property is to be returned
+ * @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if
+ * no such property was found
+ */
+ public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) {
+ return getTimstampProperty(eventSchema.getEventProperties());
+ }
+
+
+ private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) {
+ for (EventProperty ep : eventProperties) {
+ if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) {
+ return Optional.of((EventPropertyPrimitive) ep);
+ }
+
+ if (ep instanceof EventPropertyNested) {
+ return getTimstampProperty(((EventPropertyNested) ep).getEventProperties());
+ }
+ }
+
+ return Optional.empty();
+ }
}
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java b/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
index dd8ce6e08..bd529a9ac 100644
--- a/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
+++ b/streampipes-model/src/test/java/org/apache/streampipes/model/connect/adapter/AdapterDescriptionTest.java
@@ -37,7 +37,7 @@ public class AdapterDescriptionTest {
@Before
public void init() {
- adapterDescription = new AdapterDescription() {
+ adapterDescription = new SpecificAdapterStreamDescription() {
};
List rules = new ArrayList<>();
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java b/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
new file mode 100644
index 000000000..4dd3bb5bc
--- /dev/null
+++ b/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.util;
+
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.test.generator.EventPropertyNestedTestBuilder;
+import org.apache.streampipes.test.generator.EventPropertyPrimitiveTestBuilder;
+import org.apache.streampipes.test.generator.EventSchemaTestBuilder;
+import org.apache.streampipes.vocabulary.SO;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class SchemaUtilsTest {
+
+ EventProperty timestampProperty = EventPropertyPrimitiveTestBuilder.create()
+ .withSemanticType(SO.DATE_TIME)
+ .withRuntimeName("timestamp")
+ .build();
+
+ @Test
+ public void noTimestampPropery() {
+
+ var eventSchema = EventSchemaTestBuilder.create()
+ .withEventProperty(
+ EventPropertyPrimitiveTestBuilder.create().build())
+ .build();
+
+ var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void getTimestampProperty() {
+ var eventSchema = EventSchemaTestBuilder.create()
+ .withEventProperty(timestampProperty)
+ .build();
+
+// var timestampProperty = new EventPropertyPrimitive();
+// timestampProperty.setDomainProperties(List.of(URI.create(SO.DATE_TIME)));
+// timestampProperty.setRuntimeName("timestamp");
+// eventSchema.addEventProperty(timestampProperty);
+
+ var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+ assertTrue(result.isPresent());
+ assertEquals(result.get(), timestampProperty);
+ }
+
+ @Test
+ public void getNestedTimestampProperty() {
+
+ var eventSchema = EventSchemaTestBuilder.create()
+ .withEventProperty(
+ EventPropertyNestedTestBuilder.create()
+ .withEventProperty(timestampProperty)
+ .build())
+ .build();
+
+ var result = SchemaUtils.getTimestampProperty(eventSchema);
+
+ assertTrue(result.isPresent());
+ assertEquals(result.get(), timestampProperty);
+ }
+
+
+}
\ No newline at end of file
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java
new file mode 100644
index 000000000..aecb3b24c
--- /dev/null
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyNestedTestBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.generator;
+
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventPropertyNestedTestBuilder
+ extends EventPropertyTestBuilder<EventPropertyNested, EventPropertyNestedTestBuilder> {
+
+ private List<EventProperty> eventProperties;
+
+ protected EventPropertyNestedTestBuilder() {
+ super(new EventPropertyNested());
+ eventProperties = new ArrayList<>();
+ }
+
+ public static EventPropertyNestedTestBuilder create() {
+ return new EventPropertyNestedTestBuilder();
+ }
+
+ public EventPropertyNestedTestBuilder withEventProperties(List<EventProperty> eventProperties) {
+ this.eventProperties.addAll(eventProperties);
+ return this;
+ }
+
+ public EventPropertyNestedTestBuilder withEventProperty(EventProperty eventProperty) {
+ this.eventProperties.add(eventProperty);
+ return this;
+ }
+
+ @Override
+ protected EventPropertyNestedTestBuilder me() {
+ return this;
+ }
+
+ @Override
+ public EventPropertyNested build() {
+ eventProperty.setEventProperties(eventProperties);
+ return eventProperty;
+ }
+
+}
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
similarity index 51%
rename from streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java
rename to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
index c3c4d33a7..8fd69330d 100644
--- a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyGenerator.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyPrimitiveTestBuilder.java
@@ -17,6 +17,33 @@
*/
package org.apache.streampipes.test.generator;
-public class EventPropertyGenerator {
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import java.net.URI;
+import java.util.List;
+
+public class EventPropertyPrimitiveTestBuilder
+ extends EventPropertyTestBuilder<EventPropertyPrimitive, EventPropertyPrimitiveTestBuilder> {
+
+ protected EventPropertyPrimitiveTestBuilder() {
+ super(new EventPropertyPrimitive());
+ }
+
+ public EventPropertyPrimitiveTestBuilder withSemanticType(String semanticType) {
+ this.eventProperty.setDomainProperties(List.of(URI.create(semanticType)));
+ return this;
+ }
+
+ public static EventPropertyPrimitiveTestBuilder create() {
+ return new EventPropertyPrimitiveTestBuilder();
+ }
+
+ @Override
+ public EventPropertyPrimitive build() {
+ return eventProperty;
+ }
+
+ protected EventPropertyPrimitiveTestBuilder me() {
+ return this;
+ }
}
diff --git a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
similarity index 63%
rename from streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java
rename to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
index 90e96e584..6bdb52aab 100644
--- a/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaGenerator.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventPropertyTestBuilder.java
@@ -15,8 +15,26 @@
* limitations under the License.
*
*/
+
package org.apache.streampipes.test.generator;
-public class EventSchemaGenerator {
+import org.apache.streampipes.model.schema.EventProperty;
+
+public abstract class EventPropertyTestBuilder<T extends EventProperty, K extends EventPropertyTestBuilder> {
+
+ protected T eventProperty;
+
+ protected abstract K me();
+
+ protected EventPropertyTestBuilder(T eventProperty) {
+ this.eventProperty = eventProperty;
+ }
+
+ public K withRuntimeName(String runtimeName) {
+ this.eventProperty.setRuntimeName(runtimeName);
+ return me();
+ }
+
+ public abstract T build();
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
similarity index 53%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
copy to streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
index 655037aea..234505ff8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-test-utils/src/main/java/org/apache/streampipes/test/generator/EventSchemaTestBuilder.java
@@ -15,37 +15,40 @@
* limitations under the License.
*
*/
-
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.test.generator;
import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-public class SchemaUtils {
+public class EventSchemaTestBuilder {
+
+ private List<EventProperty> eventProperties;
- public static Map<String, Object> toRuntimeMap(List<EventProperty> eps) {
- return toUntypedRuntimeMap(eps);
+ private EventSchemaTestBuilder() {
+ eventProperties = new ArrayList<>();
}
- public static Map<String, Object> toUntypedRuntimeMap(List<EventProperty> eps) {
- Map<String, Object> propertyMap = new HashMap<>();
+ public static EventSchemaTestBuilder create() {
+ return new EventSchemaTestBuilder();
+ }
- for (EventProperty p : eps) {
- propertyMap.putAll(PropertyUtils.getUntypedRuntimeFormat(p));
- }
- return propertyMap;
+ public EventSchemaTestBuilder withEventProperties(List<EventProperty> eventProperties) {
+ this.eventProperties.addAll(eventProperties);
+ return this;
}
- public static List<String> toPropertyList(List<EventProperty> eps) {
- List<String> properties = new ArrayList<>();
+ public EventSchemaTestBuilder withEventProperty(EventProperty eventProperty) {
+ eventProperties.add(eventProperty);
+ return this;
+ }
- for (EventProperty p : eps) {
- properties.addAll(PropertyUtils.getFullPropertyName(p, ""));
- }
- return properties;
+ public EventSchema build() {
+ EventSchema eventSchema = new EventSchema();
+ eventSchema.setEventProperties(eventProperties);
+ return eventSchema;
}
+
}