You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/11/30 20:42:36 UTC

[incubator-streampipes-extensions] 02/02: Add runtime resolvable properties to the MergeByEnrichController

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-261
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 6a96f073a7e579db130d56cf6a85db5dca985760
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Nov 30 21:42:01 2020 +0100

    Add runtime resolvable properties to the MergeByEnrichController
---
 .../processor/enrich/MergeByEnrichController.java  | 33 +++++++++++++++++++++-
 .../strings.en                                     | 18 ++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
index 62dee80..9a08043 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
@@ -17,23 +17,37 @@
  */
 package org.apache.streampipes.processors.filters.jvm.processor.enrich;
 
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.RuntimeOptions;
+import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
+import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<MergeByEnrichParameters> {
+public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<MergeByEnrichParameters> implements ResolvesContainerProvidedOptions {
 
   private static final String SELECT_STREAM = "select-stream";
+  private static final String GROUP_STREAM = "group-stream";
+  private static final String DISABLE_GROUPING = "disable-grouping";
+
+  private static final String ENABLE_GROUPING = "enable-grouping";
+  private static final String GROUP_CONFIG = "group-config";
+  private static final String GROUP_ID_STREAM_1 = "group_id_stream_1";
+  private static final String GROUP_ID_STREAM_2 = "group_id_stream_2";
 
   @Override
   public DataProcessorDescription declareModel() {
@@ -51,6 +65,16 @@ public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<M
                     .build())
             .requiredSingleValueSelection(Labels.withId(SELECT_STREAM),
                     Options.from("Stream 1", "Stream 2"))
+
+            .requiredAlternatives(
+                    Labels.withId(GROUP_STREAM),
+                    Alternatives.from(Labels.withId(DISABLE_GROUPING)),
+
+                    Alternatives.from(Labels.withId(ENABLE_GROUPING),
+                            StaticProperties.group(Labels.withId(GROUP_CONFIG),
+                                    StaticProperties.singleValueSelectionFromContainer(Labels.withId(GROUP_ID_STREAM_1)),
+                                    StaticProperties.singleValueSelectionFromContainer(Labels.withId(GROUP_ID_STREAM_2)))))
+
             .outputStrategy(OutputStrategies.custom(true))
             .build();
   }
@@ -68,4 +92,11 @@ public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<M
 
     return new ConfiguredEventProcessor<>(staticParam, MergeByEnrich::new);
   }
+
+  @Override
+  public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+
+    System.out.println(requestId);
+      return new ArrayList<>();
+  }
 }
diff --git a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.enrich/strings.en b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.enrich/strings.en
index 9b5a7d4..06aba4b 100644
--- a/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.enrich/strings.en
+++ b/streampipes-processors-filters-jvm/src/main/resources/org.apache.streampipes.processors.filters.jvm.enrich/strings.en
@@ -3,3 +3,21 @@ org.apache.streampipes.processors.filters.jvm.enrich.description=One data stream
 
 select-stream.title=Select Output Frequency
 select-stream.description=The output frequency of the selected stream is maintained
+
+group-stream.title=Group Stream By Id
+group-stream.description=Define id's in each stream that are used for grouping
+
+disable-grouping.title=Disable Grouping
+disable-grouping.description=
+
+enable-grouping.title=Enable Grouping
+enable-grouping.description=
+
+group-config.title=Group Config
+group-config.description=
+
+group_id_stream_1.title=Group id in stream 1
+group_id_stream_1.description=
+
+group_id_stream_2.title=Group id in stream 2
+group_id_stream_2.description=