You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/13 13:12:38 UTC

[streampipes] 02/02: [#1085] Extract find timestamp property from FileStreamProtocol to EventSchemaUtils

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 02059a87d314191368147d2d4d38d19d70abb599
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Fri Jan 13 14:12:23 2023 +0100

    [#1085] Extract find timestamp property from FileStreamProtocol to EventSchemaUtils
---
 streampipes-extensions-management/pom.xml          |  6 ++++
 .../connect/adapter/AdapterPipelineGenerator.java  |  9 ++++--
 .../adapter/model/generic/GenericAdapter.java      |  4 ---
 .../pipeline/AdapterEventPreviewPipeline.java      |  6 ++++
 .../adapter/model/pipeline/AdapterPipeline.java    |  8 +++--
 .../management/util/EventSchemaUtils.java          | 29 ++----------------
 .../management/util/EventSchemaUtilsTest.java      | 18 ++++--------
 .../iiot/protocol/stream/FileStreamProtocol.java   | 34 ++++------------------
 streampipes-model/pom.xml                          |  5 ----
 .../apache/streampipes/model/util/SchemaUtils.java | 34 ----------------------
 10 files changed, 36 insertions(+), 117 deletions(-)

diff --git a/streampipes-extensions-management/pom.xml b/streampipes-extensions-management/pom.xml
index 883d30fb4..0b6c9e15d 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -96,6 +96,12 @@
             <artifactId>streampipes-service-discovery</artifactId>
             <version>0.91.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-test-utils</artifactId>
+            <version>0.91.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-vocabulary</artifactId>
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index c344466bc..32a913f99 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -74,12 +74,15 @@ public class AdapterPipelineGenerator {
     if (adapterDescription.getEventGrounding() != null
         && adapterDescription.getEventGrounding().getTransportProtocol() != null
         && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
-      return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
+      return new AdapterPipeline(
+          pipelineElements,
+          getAdapterSink(adapterDescription),
+          adapterDescription.getEventSchema());
     }
 
     DebugSinkRuleDescription debugSinkRuleDescription = getDebugRule(adapterDescription.getRules());
     if (debugSinkRuleDescription != null) {
-      return new AdapterPipeline(pipelineElements, new DebugAdapterSink());
+      return new AdapterPipeline(pipelineElements, new DebugAdapterSink(), adapterDescription.getEventSchema());
     }
 
     return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema());
@@ -176,7 +179,7 @@ public class AdapterPipelineGenerator {
   }
 
   private boolean isPrioritized(SpProtocol prioritizedProtocol,
-                                      Class<?> protocolClass) {
+                                Class<?> protocolClass) {
     return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
   }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
index 54f16fac4..fa8f864f0 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java
@@ -67,10 +67,6 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
     IProtocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format);
     this.protocol = protocolInstance;
 
-    //TODO remove
-//    EventSchema eventSchema = adapterDescription.getEventSchema();
-//    this.protocol.setEventSchema(eventSchema);
-
     logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId());
 
     protocolInstance.run(adapterPipeline);
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
index 3367d2155..17a70bfcd 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
 import org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator;
 import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
 import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
+import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -79,4 +80,9 @@ public class AdapterEventPreviewPipeline implements IAdapterPipeline {
         .collect(Collectors.toMap(Map.Entry::getKey,
             e -> new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
   }
+
+  @Override
+  public EventSchema getResultingEventSchema() {
+    return null;
+  }
 }
diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index d1a0bc1c6..7edd585ec 100644
--- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -32,14 +32,18 @@ public class AdapterPipeline implements IAdapterPipeline {
 
   private EventSchema resultingEventSchema;
 
-  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) {
+  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         EventSchema resultingEventSchema) {
     this.pipelineElements = pipelineElements;
     this.resultingEventSchema = resultingEventSchema;
   }
 
-  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
+  public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         IAdapterPipelineElement pipelineSink,
+                         EventSchema resultingEventSchema) {
     this.pipelineElements = pipelineElements;
     this.pipelineSink = pipelineSink;
+    this.resultingEventSchema = resultingEventSchema;
   }
 
   @Override
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
similarity index 74%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
copy to streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
index ba8a15587..f0a702044 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.extensions.management.util;
 
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyNested;
@@ -25,35 +25,10 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.vocabulary.SO;
 
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
-public class SchemaUtils {
-
-  public static Map<String, Object> toRuntimeMap(List<EventProperty> eps) {
-    return toUntypedRuntimeMap(eps);
-  }
-
-  public static Map<String, Object> toUntypedRuntimeMap(List<EventProperty> eps) {
-    Map<String, Object> propertyMap = new HashMap<>();
-
-    for (EventProperty p : eps) {
-      propertyMap.putAll(PropertyUtils.getUntypedRuntimeFormat(p));
-    }
-    return propertyMap;
-  }
-
-  public static List<String> toPropertyList(List<EventProperty> eps) {
-    List<String> properties = new ArrayList<>();
-
-    for (EventProperty p : eps) {
-      properties.addAll(PropertyUtils.getFullPropertyName(p, ""));
-    }
-    return properties;
-  }
+public class EventSchemaUtils {
 
   /**
    * Returns the timestamp property of an event schema as an {@code Optional}.
diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
similarity index 81%
rename from streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
rename to streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
index 4dd3bb5bc..58797f97f 100644
--- a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java
+++ b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.model.util;
+package org.apache.streampipes.extensions.management.util;
 
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.test.generator.EventPropertyNestedTestBuilder;
@@ -30,8 +30,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
-public class SchemaUtilsTest {
+public class EventSchemaUtilsTest {
 
   EventProperty timestampProperty = EventPropertyPrimitiveTestBuilder.create()
       .withSemanticType(SO.DATE_TIME)
@@ -46,7 +45,7 @@ public class SchemaUtilsTest {
             EventPropertyPrimitiveTestBuilder.create().build())
         .build();
 
-    var result = SchemaUtils.getTimestampProperty(eventSchema);
+    var result = EventSchemaUtils.getTimestampProperty(eventSchema);
 
     assertFalse(result.isPresent());
   }
@@ -57,12 +56,7 @@ public class SchemaUtilsTest {
         .withEventProperty(timestampProperty)
         .build();
 
-//    var timestampProperty = new EventPropertyPrimitive();
-//    timestampProperty.setDomainProperties(List.of(URI.create(SO.DATE_TIME)));
-//    timestampProperty.setRuntimeName("timestamp");
-//    eventSchema.addEventProperty(timestampProperty);
-
-    var result = SchemaUtils.getTimestampProperty(eventSchema);
+    var result = EventSchemaUtils.getTimestampProperty(eventSchema);
 
     assertTrue(result.isPresent());
     assertEquals(result.get(), timestampProperty);
@@ -78,11 +72,9 @@ public class SchemaUtilsTest {
                 .build())
         .build();
 
-    var result = SchemaUtils.getTimestampProperty(eventSchema);
+    var result = EventSchemaUtils.getTimestampProperty(eventSchema);
 
     assertTrue(result.isPresent());
     assertEquals(result.get(), timestampProperty);
   }
-
-
 }
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 974265bc0..a07813dbb 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -30,13 +30,10 @@ import org.apache.streampipes.extensions.management.connect.adapter.preprocessin
 import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
 import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
 import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToMqttAdapterSink;
+import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
@@ -53,7 +50,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 public class FileStreamProtocol extends Protocol {
@@ -85,7 +81,7 @@ public class FileStreamProtocol extends Protocol {
 
   @Override
   public void run(IAdapterPipeline adapterPipeline) {
-    String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), "");
+    String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());
 
     // exchange adapter pipeline sink with special purpose replay sink for file replay
     if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
@@ -172,29 +168,9 @@ public class FileStreamProtocol extends Protocol {
     return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
   }
 
-  private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
-    String result = null;
-    for (EventProperty eventProperty : eventProperties) {
-      if (eventProperty instanceof EventPropertyPrimitive && eventProperty.getDomainProperties() != null) {
-        for (int i = eventProperty.getDomainProperties().size() - 1; i >= 0; i--) {
-          if (eventProperty.getDomainProperties().get(0).toString().equals("http://schema.org/DateTime")) {
-            result = prefixKey + eventProperty.getRuntimeName();
-          }
-        }
-      } else if (eventProperty instanceof EventPropertyNested
-          && ((EventPropertyNested) eventProperty).getEventProperties() != null) {
-        result = getTimestampKey(((EventPropertyNested) eventProperty).getEventProperties(),
-            prefixKey + eventProperty.getRuntimeName() + ".");
-      } else if (eventProperty instanceof EventPropertyList
-          && ((EventPropertyList) eventProperty).getEventProperty() != null) {
-        result = getTimestampKey(Arrays.asList(((EventPropertyList) eventProperty).getEventProperty()),
-            prefixKey + eventProperty.getRuntimeName() + ".");
-      }
-      if (result != null) {
-        return result;
-      }
-    }
-    return result;
+  private String getTimestampKey(EventSchema eventSchema) {
+    var timestampProperty = EventSchemaUtils.getTimestampProperty(eventSchema);
+    return timestampProperty.get().getRuntimeName();
   }
 
   @Override
diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml
index 994507656..40bb1ca79 100644
--- a/streampipes-model/pom.xml
+++ b/streampipes-model/pom.xml
@@ -38,11 +38,6 @@
             <artifactId>streampipes-model-shared</artifactId>
             <version>0.91.0-SNAPSHOT</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-test-utils</artifactId>
-            <version>0.91.0-SNAPSHOT</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-logging</artifactId>
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
index ba8a15587..48738c143 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java
@@ -19,17 +19,11 @@
 package org.apache.streampipes.model.util;
 
 import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.vocabulary.SO;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 public class SchemaUtils {
 
@@ -55,32 +49,4 @@ public class SchemaUtils {
     return properties;
   }
 
-  /**
-   * Returns the timestamp property of an event schema as an {@code Optional}.
-   *
-   * <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain
-   * properties contains the uri http://schema.org/DateTime </p>
-   *
-   * @param eventSchema the event schema for which the timestamp property is to be returned
-   * @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if
-   * no such property was found
-   */
-  public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) {
-    return getTimstampProperty(eventSchema.getEventProperties());
-  }
-
-
-  private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) {
-    for (EventProperty ep : eventProperties) {
-      if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) {
-        return Optional.of((EventPropertyPrimitive) ep);
-      }
-
-      if (ep instanceof EventPropertyNested) {
-        return getTimstampProperty(((EventPropertyNested) ep).getEventProperties());
-      }
-    }
-
-    return Optional.empty();
-  }
 }