You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/13 13:12:38 UTC
[streampipes] 02/02: [#1085] Extract find timestamp property from FileStreamProtocol to EventSchemaUtils
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 02059a87d314191368147d2d4d38d19d70abb599
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Fri Jan 13 14:12:23 2023 +0100
[#1085] Extract find timestamp property from FileStreamProtocol to EventSchemaUtils
---
streampipes-extensions-management/pom.xml | 6 ++++
.../connect/adapter/AdapterPipelineGenerator.java | 9 ++++--
.../adapter/model/generic/GenericAdapter.java | 4 ---
.../pipeline/AdapterEventPreviewPipeline.java | 6 ++++
.../adapter/model/pipeline/AdapterPipeline.java | 8 +++--
.../management/util/EventSchemaUtils.java | 29 ++----------------
.../management/util/EventSchemaUtilsTest.java | 18 ++++--------
.../iiot/protocol/stream/FileStreamProtocol.java | 34 ++++------------------
streampipes-model/pom.xml | 5 ----
.../apache/streampipes/model/util/SchemaUtils.java | 34 ----------------------
10 files changed, 36 insertions(+), 117 deletions(-)
diff --git a/streampipes-extensions-management/pom.xml b/streampipes-extensions-management/pom.xml
index 883d30fb4..0b6c9e15d 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -96,6 +96,12 @@
<artifactId>streampipes-service-discovery</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-test-utils</artifactId>
+ <version>0.91.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-vocabulary</artifactId>
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index c344466bc..32a913f99 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -74,12 +74,15 @@ public class AdapterPipelineGenerator {
if (adapterDescription.getEventGrounding() != null
&& adapterDescription.getEventGrounding().getTransportProtocol() != null
&& adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
- return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
+ return new AdapterPipeline(
+ pipelineElements,
+ getAdapterSink(adapterDescription),
+ adapterDescription.getEventSchema());
}
DebugSinkRuleDescription debugSinkRuleDescription = getDebugRule(adapterDescription.getRules());
if (debugSinkRuleDescription != null) {
- return new AdapterPipeline(pipelineElements, new DebugAdapterSink());
+ return new AdapterPipeline(pipelineElements, new DebugAdapterSink(), adapterDescription.getEventSchema());
}
return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema());
@@ -176,7 +179,7 @@ public class AdapterPipelineGenerator {
}
private boolean isPrioritized(SpProtocol prioritizedProtocol,
- Class<?> protocolClass) {
+ Class<?> protocolClass) {
return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
}
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
index 54f16fac4..fa8f864f0 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
@@ -67,10 +67,6 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
IProtocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
this.protocol = protocolInstance;
- //TODO remove
-// EventSchema eventSchema = adapterDescription.getEventSchema();
-// this.protocol.setEventSchema(eventSchema);
-
logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId());
protocolInstance.run(adapterPipeline);
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
index 3367d2155..17a70bfcd 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
import org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator;
import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
+import org.apache.streampipes.model.schema.EventSchema;
import java.util.List;
import java.util.Map;
@@ -79,4 +80,9 @@ public class AdapterEventPreviewPipeline implements IAdapterPipeline {
.collect(Collectors.toMap(Map.Entry::getKey,
e -> new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
}
+
+ @Override
+ public EventSchema getResultingEventSchema() {
+ return null;
+ }
}
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index d1a0bc1c6..7edd585ec 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -32,14 +32,18 @@ public class AdapterPipeline implements IAdapterPipeline {
private EventSchema resultingEventSchema;
- public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) {
+ public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
this.resultingEventSchema = resultingEventSchema;
}
- public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
+ public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ IAdapterPipelineElement pipelineSink,
+ EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
this.pipelineSink = pipelineSink;
+ this.resultingEventSchema = resultingEventSchema;
}
@Override
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
similarity index 74%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
copy to streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
index ba8a15587..f0a702044 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.extensions.management.util;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
@@ -25,35 +25,10 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.vocabulary.SO;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-public class SchemaUtils {
-
- public static Map<String, Object> toRuntimeMap(List<EventProperty> eps) {
- return toUntypedRuntimeMap(eps);
- }
-
- public static Map<String, Object> toUntypedRuntimeMap(List<EventProperty> eps) {
- Map<String, Object> propertyMap = new HashMap<>();
-
- for (EventProperty p : eps) {
- propertyMap.putAll(PropertyUtils.getUntypedRuntimeFormat(p));
- }
- return propertyMap;
- }
-
- public static List<String> toPropertyList(List<EventProperty> eps) {
- List<String> properties = new ArrayList<>();
-
- for (EventProperty p : eps) {
- properties.addAll(PropertyUtils.getFullPropertyName(p, ""));
- }
- return properties;
- }
+public class EventSchemaUtils {
/**
* Returns the timestamp property of an event schema as an {@code Optional}.
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
similarity index 81%
rename from streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
rename to streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
index 4dd3bb5bc..58797f97f 100644
--- a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
+++ b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.extensions.management.util;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.test.generator.EventPropertyNestedTestBuilder;
@@ -30,8 +30,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
-public class SchemaUtilsTest {
+public class EventSchemaUtilsTest {
EventProperty timestampProperty = EventPropertyPrimitiveTestBuilder.create()
.withSemanticType(SO.DATE_TIME)
@@ -46,7 +45,7 @@ public class SchemaUtilsTest {
EventPropertyPrimitiveTestBuilder.create().build())
.build();
- var result = SchemaUtils.getTimestampProperty(eventSchema);
+ var result = EventSchemaUtils.getTimestampProperty(eventSchema);
assertFalse(result.isPresent());
}
@@ -57,12 +56,7 @@ public class SchemaUtilsTest {
.withEventProperty(timestampProperty)
.build();
-// var timestampProperty = new EventPropertyPrimitive();
-// timestampProperty.setDomainProperties(List.of(URI.create(SO.DATE_TIME)));
-// timestampProperty.setRuntimeName("timestamp");
-// eventSchema.addEventProperty(timestampProperty);
-
- var result = SchemaUtils.getTimestampProperty(eventSchema);
+ var result = EventSchemaUtils.getTimestampProperty(eventSchema);
assertTrue(result.isPresent());
assertEquals(result.get(), timestampProperty);
@@ -78,11 +72,9 @@ public class SchemaUtilsTest {
.build())
.build();
- var result = SchemaUtils.getTimestampProperty(eventSchema);
+ var result = EventSchemaUtils.getTimestampProperty(eventSchema);
assertTrue(result.isPresent());
assertEquals(result.get(), timestampProperty);
}
-
-
}
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 974265bc0..a07813dbb 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -30,13 +30,10 @@ import org.apache.streampipes.extensions.management.connect.adapter.preprocessin
import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToMqttAdapterSink;
+import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
@@ -53,7 +50,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
public class FileStreamProtocol extends Protocol {
@@ -85,7 +81,7 @@ public class FileStreamProtocol extends Protocol {
@Override
public void run(IAdapterPipeline adapterPipeline) {
- String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), "");
+ String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());
// exchange adapter pipeline sink with special purpose replay sink for file replay
if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
@@ -172,29 +168,9 @@ public class FileStreamProtocol extends Protocol {
return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
}
- private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
- String result = null;
- for (EventProperty eventProperty : eventProperties) {
- if (eventProperty instanceof EventPropertyPrimitive && eventProperty.getDomainProperties() != null) {
- for (int i = eventProperty.getDomainProperties().size() - 1; i >= 0; i--) {
- if (eventProperty.getDomainProperties().get(0).toString().equals("http://schema.org/DateTime")) {
- result = prefixKey + eventProperty.getRuntimeName();
- }
- }
- } else if (eventProperty instanceof EventPropertyNested
- && ((EventPropertyNested) eventProperty).getEventProperties() != null) {
- result = getTimestampKey(((EventPropertyNested) eventProperty).getEventProperties(),
- prefixKey + eventProperty.getRuntimeName() + ".");
- } else if (eventProperty instanceof EventPropertyList
- && ((EventPropertyList) eventProperty).getEventProperty() != null) {
- result = getTimestampKey(Arrays.asList(((EventPropertyList) eventProperty).getEventProperty()),
- prefixKey + eventProperty.getRuntimeName() + ".");
- }
- if (result != null) {
- return result;
- }
- }
- return result;
+ private String getTimestampKey(EventSchema eventSchema) {
+ var timestampProperty = EventSchemaUtils.getTimestampProperty(eventSchema);
+ return timestampProperty.get().getRuntimeName();
}
@Override
diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml
index 994507656..40bb1ca79 100644
--- a/streampipes-model/pom.xml
+++ b/streampipes-model/pom.xml
@@ -38,11 +38,6 @@
<artifactId>streampipes-model-shared</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-test-utils</artifactId>
- <version>0.91.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-logging</artifactId>
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
index ba8a15587..48738c143 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
@@ -19,17 +19,11 @@
package org.apache.streampipes.model.util;
import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.vocabulary.SO;
-import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
public class SchemaUtils {
@@ -55,32 +49,4 @@ public class SchemaUtils {
return properties;
}
- /**
- * Returns the timestamp property of an event schema as an {@code Optional}.
- *
- * <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain
- * properties contains the uri http://schema.org/DateTime </p>
- *
- * @param eventSchema the event schema for which the timestamp property is to be returned
- * @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if
- * no such property was found
- */
- public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) {
- return getTimstampProperty(eventSchema.getEventProperties());
- }
-
-
- private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) {
- for (EventProperty ep : eventProperties) {
- if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) {
- return Optional.of((EventPropertyPrimitive) ep);
- }
-
- if (ep instanceof EventPropertyNested) {
- return getTimstampProperty(((EventPropertyNested) ep).getEventProperties());
- }
- }
-
- return Optional.empty();
- }
}