You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/22 20:50:03 UTC

[incubator-streampipes-extensions] branch dev updated: Add StaticDistanceCalculator processor

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0697b24  Add StaticDistanceCalculator processor
0697b24 is described below

commit 0697b247f4dc156504c71ac7289ec9f9e96bbc9a
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Mar 22 21:49:44 2020 +0100

    Add StaticDistanceCalculator processor
---
 .../pe/jvm/AllPipelineElementsInit.java            |  4 +-
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |  4 +-
 .../StaticDistanceCalculator.java                  | 60 +++++++++++++++
 .../StaticDistanceCalculatorController.java        | 87 ++++++++++++++++++++++
 .../StaticDistanceCalculatorParameters.java        | 65 ++++++++++++++++
 .../documentation.md                               |  5 +-
 .../documentation.md                               | 11 +--
 .../strings.en                                     | 14 ++++
 8 files changed, 238 insertions(+), 12 deletions(-)

diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index 6869848..0459d7c 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.Di
 import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
 import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
 import org.apache.streampipes.processors.geo.jvm.processor.speed.SpeedCalculatorController;
+import org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.StaticDistanceCalculatorController;
 import org.apache.streampipes.processors.geo.jvm.processor.staticgeocoder.StaticGoogleMapsGeocodingController;
 import org.apache.streampipes.processors.imageprocessing.jvm.processor.genericclassification.GenericImageClassificationController;
 import org.apache.streampipes.processors.imageprocessing.jvm.processor.imagecropper.ImageCropperController;
@@ -131,7 +132,8 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new ReverseGeocodingController())
             .add(new SetEpsgController())
             .add(new LatLngToGeoController())
-            .add(new SpeedCalculatorController());
+            .add(new SpeedCalculatorController())
+            .add(new StaticDistanceCalculatorController());
 
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 34df051..f82df33 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.Di
 import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
 import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
 import org.apache.streampipes.processors.geo.jvm.processor.speed.SpeedCalculatorController;
+import org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.StaticDistanceCalculatorController;
 import org.apache.streampipes.processors.geo.jvm.processor.staticgeocoder.StaticGoogleMapsGeocodingController;
 
 public class GeoJvmInit extends StandaloneModelSubmitter {
@@ -46,7 +47,8 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
         .add(new ReverseGeocodingController())
         .add(new SetEpsgController())
         .add(new LatLngToGeoController())
-        .add(new SpeedCalculatorController());
+        .add(new SpeedCalculatorController())
+        .add(new StaticDistanceCalculatorController());
 
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java
new file mode 100644
index 0000000..2889388
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java
@@ -0,0 +1,60 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.geo.jvm.processor.util.DistanceUtil;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+public class StaticDistanceCalculator implements EventProcessor<StaticDistanceCalculatorParameters> {
+
+  private String latitudeFieldName;
+  private String longitudeFieldName;
+
+  private Float selectedLocationLatitude;
+  private Float selectedLocationLongitude;
+
+  @Override
+  public void onInvocation(StaticDistanceCalculatorParameters parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+    this.latitudeFieldName = parameters.getLatitudeFieldName();
+    this.longitudeFieldName = parameters.getLongitudeFieldName();
+
+    this.selectedLocationLatitude = parameters.getSelectedLatitude();
+    this.selectedLocationLongitude = parameters.getSelectedLongitude();
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+    Float latitude = event.getFieldBySelector(latitudeFieldName).getAsPrimitive().getAsFloat();
+    Float longitude = event.getFieldBySelector(longitudeFieldName).getAsPrimitive().getAsFloat();
+
+    Float distance = DistanceUtil.dist(latitude, longitude, selectedLocationLatitude,
+            selectedLocationLongitude);
+
+    event.addField("distance", distance);
+
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java
new file mode 100644
index 0000000..2192e7d
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java
@@ -0,0 +1,87 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.Geo;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.net.URI;
+
+public class StaticDistanceCalculatorController extends StandaloneEventProcessingDeclarer<StaticDistanceCalculatorParameters> {
+
+  private static final String LATITUDE_KEY = "latitude-key";
+  private static final String LONGITUDE_KEY = "longitude-key" ;
+  private static final String SELECTED_LATITUDE_KEY = "selected-latitude-key";
+  private static final String SELECTED_LONGITUDE_KEY = "selected-longitude-key";
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.processor" +
+            ".staticdistancecalculator")
+            .category(DataProcessorType.FILTER)
+            .withAssets(Assets.DOCUMENTATION)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
+                            , Labels.withId(LATITUDE_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+                    .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
+                            , Labels.withId(LONGITUDE_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+                    .build())
+            .requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
+            .requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
+            .outputStrategy(
+                    OutputStrategies.append(PrimitivePropertyBuilder
+                            .create(Datatypes.Float,"distance")
+                            .domainProperty(SO.Number)
+                            .measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
+                            .build())
+            )
+            .build();
+
+  }
+
+  @Override
+  public ConfiguredEventProcessor<StaticDistanceCalculatorParameters> onInvocation(DataProcessorInvocation graph,
+                                                  ProcessingElementParameterExtractor extractor) {
+    String latitudeFieldName = extractor.mappingPropertyValue(LATITUDE_KEY);
+    String longitudeFieldName = extractor.mappingPropertyValue(LONGITUDE_KEY);
+    Float selectedLatitude = extractor.singleValueParameter(SELECTED_LATITUDE_KEY, Float.class);
+    Float selectedLongitude = extractor.singleValueParameter(SELECTED_LONGITUDE_KEY, Float.class);
+
+    StaticDistanceCalculatorParameters staticParam = new StaticDistanceCalculatorParameters(graph,
+            latitudeFieldName, longitudeFieldName, selectedLatitude, selectedLongitude);
+
+    return new ConfiguredEventProcessor<>(staticParam, StaticDistanceCalculator::new);
+  }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java
new file mode 100644
index 0000000..0d7e347
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java
@@ -0,0 +1,65 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class StaticDistanceCalculatorParameters extends EventProcessorBindingParams {
+
+  private String latitudeFieldName;
+  private String longitudeFieldName;
+
+  private Float selectedLatitude;
+  private Float selectedLongitude;
+
+  public StaticDistanceCalculatorParameters(DataProcessorInvocation graph,
+                                            String latitudeFieldName,
+                                            String longitudeFieldName,
+                                            Float selectedLatitude,
+                                            Float selectedLongitude) {
+    super(graph);
+    this.latitudeFieldName = latitudeFieldName;
+    this.longitudeFieldName = longitudeFieldName;
+    this.selectedLatitude = selectedLatitude;
+    this.selectedLongitude = selectedLongitude;
+  }
+
+  public String getLatitudeFieldName() {
+    return latitudeFieldName;
+  }
+
+  public String getLongitudeFieldName() {
+    return longitudeFieldName;
+  }
+
+  public Float getSelectedLatitude() {
+    return selectedLatitude;
+  }
+
+  public void setSelectedLatitude(Float selectedLatitude) {
+    this.selectedLatitude = selectedLatitude;
+  }
+
+  public Float getSelectedLongitude() {
+    return selectedLongitude;
+  }
+
+  public void setSelectedLongitude(Float selectedLongitude) {
+    this.selectedLongitude = selectedLongitude;
+  }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
index d43213b..f221b91 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## Spatial Grid Enrichment
+## Distance Calculator
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,8 +26,7 @@
 
 ## Description
 
-Groups spatial events into cells of a given size
-Add a detailed description here
+Calculates the distance between two latitude/longitude pairs in a single event.
 
 ***
 
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
similarity index 80%
copy from streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
copy to streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
index d43213b..eb70028 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
@@ -16,23 +16,20 @@
   ~
   -->
 
-## Spatial Grid Enrichment
-
-<p align="center"> 
-    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
-</p>
+## Static Distance Calculator
 
 ***
 
 ## Description
 
-Groups spatial events into cells of a given size
-Add a detailed description here
+Calculates the distance (in km) between a fixed location (e.g., a place) and a latitude/longitude pair of an input
+ event.
 
 ***
 
 ## Required input
 
+Requires a data stream that provides latitude and longitude values.
 
 ***
 
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en
new file mode 100644
index 0000000..4238a5b
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en
@@ -0,0 +1,14 @@
+org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.title=Static Distance Calculator
+org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.description=Calculates the distance between a fixed location and a moving location
+
+latitude-key.title=Latitude field
+latitude-key.description=
+
+longitude-key.title=Longitude field
+longitude-key.description=
+
+selected-latitude-key.title=Latitude
+selected-latitude-key.description=The latitude value of the fixed location
+
+selected-longitude-key.title=Longitude
+selected-longitude-key.description=The longitude value of the fixed location
\ No newline at end of file