You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/16 10:46:54 UTC

[incubator-streampipes-extensions] 03/04: improved distance calculation suing new utility class SpDistanceCalculatior

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/geodesicCalc
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 2f01d188888315c031778ed6e7b09fad896e432b
Author: micklich <fl...@disy.net>
AuthorDate: Sat May 16 12:42:09 2020 +0200

    improved distance calculation suing new utility class SpDistanceCalculatior
---
 .../distancecalculator/DistanceCalculator.java     | 45 +++++++++---
 .../DistanceCalculatorController.java              | 84 +++++++++++++++++-----
 .../DistanceCalculatorParameters.java              | 10 ++-
 3 files changed, 109 insertions(+), 30 deletions(-)

diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculator.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculator.java
index d310f1b..481a72c 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculator.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculator.java
@@ -18,33 +18,58 @@
 
 package org.apache.streampipes.processors.geo.jvm.processor.distancecalculator;
 
+import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.geo.jvm.processor.util.DistanceUtil;
+import org.apache.streampipes.processors.geo.jvm.processor.util.SpLengthCalculator;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 
 public class DistanceCalculator implements EventProcessor<DistanceCalculatorParameters> {
 
-  private DistanceCalculatorParameters params;
+  private static Logger LOG;
+  private SpLengthCalculator length;
+
+  private String latitute1;
+  private String longitude1;
+  private String latitude2;
+  private String longitude2;
+  private Integer unit;
+
+
 
   @Override
-  public void onInvocation(DistanceCalculatorParameters numericalFilterParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
+  public void onInvocation(DistanceCalculatorParameters params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
           runtimeContext) {
-    this.params = numericalFilterParameters;
+
+    LOG = params.getGraph().getLogger(DistanceCalculatorParameters.class);
+    this.latitute1 = params.getLat1PropertyName();
+    this.longitude1 = params.getLong1PropertyName();
+    this.latitude2 = params.getLat2PropertyName();
+    this.longitude2 = params.getLong2PropertyName();
+    this.unit = params.getUnit();
+
+    // init class with constructor
+    length = new SpLengthCalculator(params.getDecimalPosition());
+
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector out) {
 
-    float lat1 = event.getFieldBySelector(this.params.getLat1PropertyName()).getAsPrimitive().getAsFloat();
-    float long1 = event.getFieldBySelector(this.params.getLong1PropertyName()).getAsPrimitive().getAsFloat();
-    float lat2 = event.getFieldBySelector(this.params.getLat2PropertyName()).getAsPrimitive().getAsFloat();
-    float long2 = event.getFieldBySelector(this.params.getLong2PropertyName()).getAsPrimitive().getAsFloat();
+    double lat1 = event.getFieldBySelector(latitute1).getAsPrimitive().getAsDouble();
+    double lng1 = event.getFieldBySelector(longitude1).getAsPrimitive().getAsDouble();
+    double lat2 = event.getFieldBySelector(latitude2).getAsPrimitive().getAsDouble();
+    double lng2 = event.getFieldBySelector(longitude2).getAsPrimitive().getAsDouble();
+
+    length.calcGeodesicDistance(lat1, lng1, lat2, lng2);
 
-    double resultDist = DistanceUtil.dist(lat1, long1, lat2, long2);
+    if (unit != 1) {
+      length.convertUnit(unit);
+    }
 
-    event.addField("distance", resultDist);
+    event.addField(DistanceCalculatorController.LENGTH_RUNTIME, length.getLengthAsString());
+    event.addField(DistanceCalculatorController.UNIT_RUNTIME, length.getLengthUnit());
 
     out.collect(event);
   }
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorController.java
index a6e1497..7bbf3a9 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorController.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorController.java
@@ -22,59 +22,105 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.geo.jvm.processor.util.SpLengthCalculator;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
 
+import java.net.URI;
+
 public class DistanceCalculatorController extends StandaloneEventProcessingDeclarer<DistanceCalculatorParameters> {
 
   private static final String LAT_1_KEY = "lat1";
   private static final String LONG_1_KEY = "long1";
   private static final String LAT_2_KEY = "lat2";
   private static final String LONG_2_KEY = "long2";
+  private static final String DECIMAL_POSITION_KEY = "decimalPosition";
+  private static final String UNIT_KEY = "unit";
+
   private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
 
+  protected final static String LENGTH_RUNTIME = "geodesicDistance";
+  protected final static String UNIT_RUNTIME = "geodesicDistanceUnit";
 
   @Override
   public DataProcessorDescription declareModel() {
     return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.processor.distancecalculator")
-            .category(DataProcessorType.FILTER)
-            .withAssets(Assets.DOCUMENTATION)
-            .withLocales(Locales.EN)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
-                            , Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
-                            , Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
-                            , Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
-                            , Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                    .build())
-            .outputStrategy(
-                    OutputStrategies.append(EpProperties.numberEp(Labels.withId(CALCULATED_DISTANCE_KEY), "distance", SO.Number))
+        .category(DataProcessorType.FILTER)
+        .withAssets(Assets.DOCUMENTATION)
+        .withLocales(Locales.EN)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
+                , Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+            .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
+                , Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+            .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
+                , Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+            .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
+                , Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+            .build())
+        .requiredIntegerParameter(
+            Labels.withId(DECIMAL_POSITION_KEY), 0, 10, 1)
+        .requiredSingleValueSelection(
+            Labels.withId(UNIT_KEY),
+            Options.from(
+                SpLengthCalculator.ValidLengthUnits.METER.name(),
+                SpLengthCalculator.ValidLengthUnits.KM.name(),
+                SpLengthCalculator.ValidLengthUnits.MILE.name(),
+                SpLengthCalculator.ValidLengthUnits.FOOT.name()
             )
-            .build();
+        )
 
+        .outputStrategy(OutputStrategies.append(
+            PrimitivePropertyBuilder
+                .create(Datatypes.Double,LENGTH_RUNTIME)
+                .domainProperty(SO.Number)
+                //todo dynamic
+                //.measurementUnit(URI.create("http://qudt.org/vocab/unit#Meter"))
+                .build(),
+            PrimitivePropertyBuilder
+                .create(Datatypes.Double,UNIT_RUNTIME)
+                .domainProperty(SO.Text)
+                // todo unit type?
+                .measurementUnit(URI.create("http://qudt.org/vocab/quantitykind/Length"))
+                .build())
+        )
+        .build();
   }
 
   @Override
   public ConfiguredEventProcessor<DistanceCalculatorParameters> onInvocation
-          (DataProcessorInvocation sepa, ProcessingElementParameterExtractor extractor) {
+      (DataProcessorInvocation sepa, ProcessingElementParameterExtractor extractor) {
 
     String lat1PropertyName = extractor.mappingPropertyValue(LAT_1_KEY);
     String long11PropertyName = extractor.mappingPropertyValue(LONG_1_KEY);
     String lat2PropertyName = extractor.mappingPropertyValue(LAT_2_KEY);
     String long2PropertyName = extractor.mappingPropertyValue(LONG_2_KEY);
 
-    DistanceCalculatorParameters staticParam = new DistanceCalculatorParameters(sepa, lat1PropertyName, long11PropertyName, lat2PropertyName, long2PropertyName);
+    Integer decimalPosition = extractor.singleValueParameter(DECIMAL_POSITION_KEY, Integer.class);
+
+    String chosenUnit = extractor.selectedSingleValue(UNIT_KEY, String.class);
+
+    // convert enum to integer values default meter
+    int unit = 1;
+    if (chosenUnit.equals(SpLengthCalculator.ValidLengthUnits.KM.name())){
+      unit = SpLengthCalculator.ValidLengthUnits.KM.getNumber();
+    } else if (chosenUnit.equals(SpLengthCalculator.ValidLengthUnits.MILE.name())){
+      unit = SpLengthCalculator.ValidLengthUnits.MILE.getNumber();
+    } else if (chosenUnit.equals(SpLengthCalculator.ValidLengthUnits.FOOT.name())){
+      unit = SpLengthCalculator.ValidLengthUnits.FOOT.getNumber();
+    }
+
+    DistanceCalculatorParameters staticParam = new DistanceCalculatorParameters(sepa, lat1PropertyName, long11PropertyName, lat2PropertyName, long2PropertyName, decimalPosition, unit);
 
     return new ConfiguredEventProcessor<>(staticParam, DistanceCalculator::new);
   }
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorParameters.java
index a4ecdd4..187419d 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorParameters.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/distancecalculator/DistanceCalculatorParameters.java
@@ -27,17 +27,21 @@ public class DistanceCalculatorParameters extends EventProcessorBindingParams {
   public String long1PropertyName;
   public String lat2PropertyName;
   public String long2PropertyName;
+  protected Integer decimalPosition;
+  protected Integer unit;
 
   public DistanceCalculatorParameters(DataProcessorInvocation graph) {
     super(graph);
   }
 
-  public DistanceCalculatorParameters(DataProcessorInvocation graph, String lat1PropertyName, String long1PropertyName, String lat2PropertyName, String long2PropertyName) {
+  public DistanceCalculatorParameters(DataProcessorInvocation graph, String lat1PropertyName, String long1PropertyName, String lat2PropertyName, String long2PropertyName, Integer decimalPosition, Integer unit) {
     super(graph);
     this.lat1PropertyName = lat1PropertyName;
     this.long1PropertyName = long1PropertyName;
     this.lat2PropertyName = lat2PropertyName;
     this.long2PropertyName = long2PropertyName;
+    this.decimalPosition = decimalPosition;
+    this.unit = unit;
   }
 
 
@@ -56,4 +60,8 @@ public class DistanceCalculatorParameters extends EventProcessorBindingParams {
   public String getLong2PropertyName() {
     return long2PropertyName;
   }
+
+  public Integer getDecimalPosition() { return decimalPosition; }
+
+  public Integer getUnit() { return unit; }
 }