You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/23 22:53:09 UTC

[streampipes] 01/16: rewrite to 1-class processor model

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch STREAMPIPES-642
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 260e1e49bf818d62b7141830686a56c24a4b2452
Author: micklich <mi...@apache.org>
AuthorDate: Sat Nov 19 23:35:11 2022 +0100

    rewrite to 1-class processor model
---
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |  4 +-
 .../geo/jvm/jts/processor/epsg/EpsgProcessor.java  | 80 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 2 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 949887050..52897a751 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -29,8 +29,8 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.epsg.EpsgProcessor;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
-import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
 import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
 import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
@@ -52,7 +52,7 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
                     new GoogleMapsGeocodingController(),
                     new StaticGoogleMapsGeocodingController(),
                     new ReverseGeocodingController(),
-                    new SetEpsgController(),
+                    new EpsgProcessor(),
                     new LatLngToGeoController(),
                     new CreateTrajectoryFromPointsController(),
                     new SpeedCalculatorController(),
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
new file mode 100644
index 000000000..44057d9d2
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
@@ -0,0 +1,80 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.epsg;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public class EpsgProcessor extends StreamPipesDataProcessor {
+    private static final String EPSG_KEY  = "epsg-key";
+    public static final String EPSG_RUNTIME = "epsg";
+
+    private int epsgCode;
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg")
+                .category(DataProcessorType.GEO)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .build())
+                .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+                                .create(Datatypes.Integer, EPSG_RUNTIME)
+                                .domainProperty("http://data.ign.fr/def/ignf#CartesianCS")
+                                .build())
+                )
+                .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+                             EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+        this.epsgCode = parameters.extractor().singleValueParameter(EPSG_KEY, Integer.class);
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+        event.addField(EpsgProcessor.EPSG_RUNTIME, this.epsgCode);
+        collector.collect(event);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+
+    }
+}