You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/12/11 07:41:01 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-449] Update API of rate limit processor

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f5ff956  [STREAMPIPES-449] Update API of rate limit processor
f5ff956 is described below

commit f5ff9561743ad798fae0e6eb3176a60bddd8ee0c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Dec 11 08:40:46 2021 +0100

    [STREAMPIPES-449] Update API of rate limit processor
---
 .../processors/filters/jvm/FiltersJvmInit.java     |   4 +-
 .../filters/jvm/processor/limit/RateLimit.java     |  75 ---------------
 .../jvm/processor/limit/RateLimitParameters.java   |  66 -------------
 ...imitController.java => RateLimitProcessor.java} | 103 +++++++++++++++------
 4 files changed, 76 insertions(+), 172 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index df52d22..001446b 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -31,7 +31,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.filters.jvm.processor.booleanfilter.BooleanFilterProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
@@ -63,7 +63,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
                     new MergeBySchemaProcessor(),
                     new ComposeProcessor(),
                     new NumericalTextFilterProcessor(),
-                    new RateLimitController())
+                    new RateLimitProcessor())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
                     new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java
deleted file mode 100644
index 030c2f7..0000000
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.limit;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowFactory;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.window.Window;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RateLimit implements EventProcessor<RateLimitParameters> {
-    private final static String DEFAULT_GROUP = "default";
-    private Boolean groupingEnabled;
-    private String groupingField;
-    private ConcurrentMap<Object, Window> windows;
-    private WindowFactory factory;
-
-    @Override
-    public void onInvocation(RateLimitParameters parameters,
-                             SpOutputCollector outputCollector,
-                             EventProcessorRuntimeContext runtimeContext) {
-        this.groupingEnabled = parameters.getGroupingEnabled();
-        this.groupingField = parameters.getGroupingField();
-        this.windows = new ConcurrentHashMap<>();
-        this.factory = new WindowFactory(
-                parameters.getWindowType(),
-                parameters.getWindowExpression(),
-                parameters.getEventSelection(),
-                outputCollector);
-    }
-
-    @Override
-    public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
-        Object group = groupingEnabled ? getGroupKey(event) : DEFAULT_GROUP;
-        Window window = windows.get(group);
-        if (window == null) {
-            window = factory.create();
-            window.init();
-            windows.put(group, window);
-        }
-        window.onEvent(event);
-    }
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-        for (Window window : this.windows.values()) {
-            window.destroy();
-        }
-    }
-
-    private Object getGroupKey(Event event) {
-        return event.getFieldBySelector(groupingField).getAsPrimitive().getAsString();
-    }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java
deleted file mode 100644
index d7757e4..0000000
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.limit;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.EventSelection;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowType;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class RateLimitParameters extends EventProcessorBindingParams {
-    private EventSelection eventSelection;
-    private WindowType windowType;
-    private Boolean groupingEnabled;
-    private String groupingField;
-    private Object windowExpression;
-
-    public RateLimitParameters(DataProcessorInvocation graph,
-                               WindowType windowType,
-                               Object windowExpression,
-                               Boolean groupingEnabled,
-                               String groupingField,
-                               EventSelection eventSelection) {
-        super(graph);
-        this.eventSelection = eventSelection;
-        this.windowType = windowType;
-        this.groupingEnabled = groupingEnabled;
-        this.groupingField = groupingField;
-        this.windowExpression = windowExpression;
-    }
-
-    public WindowType getWindowType() {
-        return windowType;
-    }
-
-    public Object getWindowExpression() {
-        return windowExpression;
-    }
-
-    public Boolean getGroupingEnabled() {
-        return groupingEnabled;
-    }
-
-    public String getGroupingField() {
-        return groupingField;
-    }
-
-    public EventSelection getEventSelection() {
-        return eventSelection;
-    }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
similarity index 56%
rename from streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java
rename to streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
index 68242de..c3025bf 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
@@ -15,29 +15,33 @@
  * limitations under the License.
  *
  */
+
 package org.apache.streampipes.processors.filters.jvm.processor.limit;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.filters.jvm.processor.limit.util.EventSelection;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowFactory;
 import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowType;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.window.Window;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RateLimitProcessor extends StreamPipesDataProcessor {
 
-public class RateLimitController extends StandaloneEventProcessingDeclarer<RateLimitParameters> {
     private static final String EVENT_SELECTION = "event-selection";
     private static final String WINDOW_TYPE = "window-type";
     private static final String LENGTH_WINDOW = "length-window";
@@ -54,6 +58,13 @@ public class RateLimitController extends StandaloneEventProcessingDeclarer<RateL
     private static final String OPTION_LAST = "Last";
     private static final String OPTION_ALL = "All";
 
+    private final static String DEFAULT_GROUP = "default";
+    private Boolean groupingEnabled;
+    private String groupingField;
+    private ConcurrentMap<Object, Window> windows;
+    private WindowFactory factory;
+
+
     @Override
     public DataProcessorDescription declareModel() {
         return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.limit")
@@ -80,29 +91,63 @@ public class RateLimitController extends StandaloneEventProcessingDeclarer<RateL
     }
 
     @Override
-    public ConfiguredEventProcessor<RateLimitParameters> onInvocation(DataProcessorInvocation graph,
-                                                                      ProcessingElementParameterExtractor extractor) {
-        Boolean groupingEnabled = Boolean.valueOf(extractor.selectedSingleValue(GROUPING_ENABLED, String.class));
-        String groupingField = extractor.mappingPropertyValue(GROUPING_FIELD);
-        EventSelection eventSelection = EventSelection.valueOf(extractor
+      public void onInvocation(ProcessorParams
+        processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+
+        this.groupingEnabled = Boolean.valueOf(processorParams.extractor().selectedSingleValue(GROUPING_ENABLED, String.class));
+        this.groupingField = processorParams.extractor().mappingPropertyValue(GROUPING_FIELD);
+        this.windows = new ConcurrentHashMap<>();
+
+        EventSelection eventSelection = EventSelection.valueOf(processorParams.extractor()
                 .selectedSingleValue(EVENT_SELECTION, String.class).toUpperCase());
-        String windowType = extractor.selectedAlternativeInternalId(WINDOW_TYPE);
+        String windowType = processorParams.extractor().selectedAlternativeInternalId(WINDOW_TYPE);
+
         if (TIME_WINDOW.equals(windowType)) {
-            Integer windowSize = extractor.singleValueParameter(TIME_WINDOW_SIZE, Integer.class);
-            RateLimitParameters params = new RateLimitParameters(graph, WindowType.TIME,
-                    windowSize, groupingEnabled, groupingField, eventSelection);
-            return new ConfiguredEventProcessor<>(params, RateLimit::new);
+            Integer windowSize = processorParams.extractor().singleValueParameter(TIME_WINDOW_SIZE, Integer.class);
+            this.factory = new WindowFactory(
+                    WindowType.TIME,
+                    windowSize,
+                    eventSelection,
+                    spOutputCollector);
+
         } else if (CRON_WINDOW.equals(windowType)) {
-            String cronExpression = extractor.singleValueParameter(CRON_WINDOW_EXPR, String.class);
-            RateLimitParameters params = new RateLimitParameters(graph, WindowType.CRON,
-                    cronExpression, groupingEnabled, groupingField, eventSelection);
-            return new ConfiguredEventProcessor<>(params, RateLimit::new);
+            String cronExpression = processorParams.extractor().singleValueParameter(CRON_WINDOW_EXPR, String.class);
+            this.factory = new WindowFactory(
+                    WindowType.CRON,
+                    cronExpression,
+                    eventSelection,
+                    spOutputCollector);
+
         } else {
-            Integer windowSize = extractor.singleValueParameter(LENGTH_WINDOW_SIZE, Integer.class);
-            RateLimitParameters params = new RateLimitParameters(graph, WindowType.LENGTH,
-                    windowSize, groupingEnabled, groupingField, eventSelection);
-            return new ConfiguredEventProcessor<>(params, RateLimit::new);
+            Integer windowSize = processorParams.extractor().singleValueParameter(LENGTH_WINDOW_SIZE, Integer.class);
+            this.factory = new WindowFactory(
+                    WindowType.LENGTH,
+                    windowSize,
+                    eventSelection,
+                    spOutputCollector);
         }
     }
 
+    @Override
+    public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+        Object group = groupingEnabled ? getGroupKey(event) : DEFAULT_GROUP;
+        Window window = windows.get(group);
+        if (window == null) {
+            window = factory.create();
+            window.init();
+            windows.put(group, window);
+        }
+        window.onEvent(event);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        for (Window window : this.windows.values()) {
+            window.destroy();
+        }
+    }
+
+    private Object getGroupKey(Event event) {
+        return event.getFieldBySelector(groupingField).getAsPrimitive().getAsString();
+    }
 }