You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/28 10:00:05 UTC

[incubator-streampipes] branch dev updated (b420738 -> 020f6a7)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from b420738  Merge pull request #69 from apache/STREAMPIPES-483
     new a6d78cb  [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern
     new 89c3842  [STREAMPIPES-486] Bump Flink version, add log4j to dependency management
     new 8f96f57  [STREAMPIPES-487] Refactor Flink-based pipeline elements
     new 020f6a7  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../standalone/jobmanager/docker-compose.yml       |   2 +-
 .../standalone/taskmanager/docker-compose.yml      |   2 +-
 installer/compose/docker-compose.full.yml          |   4 +-
 installer/k8s/values.yaml                          |   2 +-
 pom.xml                                            |  14 ++-
 .../streampipes/client/StreamPipesClient.java      |   5 +-
 .../container/config/ConfigExtractor.java          |   4 +-
 streampipes-extensions/pom.xml                     |   6 ++
 .../pom.xml                                        |   1 +
 .../pe/flink/AllFlinkPipelineElementsInit.java     | 108 +++++++--------------
 .../apache/streampipes/pe/flink/config/Config.java |  76 ---------------
 .../streampipes/pe/flink/config/ConfigKeys.java    |  27 ------
 .../pom.xml                                        |  15 +++
 .../flink/AbstractAggregationProgram.java          |  26 +++--
 .../aggregation/flink/AggregationFlinkInit.java    |  51 ++++++----
 .../flink/config/AggregationFlinkConfig.java       |  81 ----------------
 .../aggregation/flink/config/ConfigKeys.java       |  10 +-
 .../aggregation/AggregationController.java         |   6 +-
 .../processor/aggregation/AggregationProgram.java  |   8 +-
 .../flink/processor/count/CountController.java     |  11 +--
 .../flink/processor/count/CountProgram.java        |   8 +-
 .../processor/eventcount/EventCountController.java |  11 +--
 .../processor/eventcount/EventCountProgram.java    |   8 +-
 .../flink/processor/rate/EventRateController.java  |   9 +-
 .../flink/processor/rate/EventRateProgram.java     |   8 +-
 .../aggregation/TestTimeAggregationProgram.java    |   9 +-
 .../flink/processor/count/TestCountProgram.java    |  10 +-
 .../flink/processor/rate/TestRateProgram.java      |   9 +-
 .../enricher/flink/AbstractEnricherProgram.java    |  26 +++--
 .../enricher/flink/EnricherFlinkInit.java          |  51 ++++++----
 .../enricher/flink/config/ConfigKeys.java          |  10 +-
 .../enricher/flink/config/EnricherFlinkConfig.java |  80 ---------------
 .../processor/math/mathop/MathOpController.java    |  17 +---
 .../flink/processor/math/mathop/MathOpProgram.java |   8 +-
 .../math/staticmathop/StaticMathOpController.java  |  16 +--
 .../math/staticmathop/StaticMathOpProgram.java     |   8 +-
 .../processor/timestamp/TimestampController.java   |   9 +-
 .../processor/timestamp/TimestampProgram.java      |   8 +-
 .../trigonometry/TrigonometryController.java       |   3 +-
 .../trigonometry/TrigonometryProgram.java          |   8 +-
 .../UrlDereferencingController.java                |   3 +-
 .../urldereferencing/UrlDereferencingProgram.java  |   8 +-
 .../processor/geo/flink/AbstractGeoProgram.java    |  27 +++---
 .../processor/geo/flink/GeoFlinkInit.java          |  43 +++++---
 .../processor/geo/flink/config/ConfigKeys.java     |  10 +-
 .../processor/geo/flink/config/GeoFlinkConfig.java |  79 ---------------
 .../SpatialGridEnrichmentController.java           |   3 +-
 .../gridenricher/SpatialGridEnrichmentProgram.java |   8 +-
 .../flink/AbstractPatternDetectionProgram.java     |  26 +++--
 .../detection/flink/PatternDetectionFlinkInit.java |  50 ++++++----
 .../pattern/detection/flink/config/ConfigKeys.java |  10 +-
 .../flink/config/PatternDetectionFlinkConfig.java  |  80 ---------------
 .../flink/processor/absence/AbsenceController.java |   3 +-
 .../flink/processor/absence/AbsenceProgram.java    |   8 +-
 .../flink/processor/and/AndController.java         |   3 +-
 .../detection/flink/processor/and/AndProgram.java  |   8 +-
 .../processor/peak/PeakDetectionController.java    |   6 +-
 .../flink/processor/peak/PeakDetectionProgram.java |   8 +-
 .../processor/sequence/SequenceController.java     |   3 +-
 .../flink/processor/sequence/SequenceProgram.java  |   8 +-
 .../detection/processor/absence/TestAbsence.java   |  17 ++--
 .../pattern/detection/processor/and/TestAnd.java   |  17 ++--
 .../flink/AbstractStatisticsProgram.java           |  26 +++--
 .../statistics/flink/StatisticsFlinkInit.java      |  45 +++++----
 .../statistics/flink/config/ConfigKeys.java        |  10 +-
 .../flink/config/StatisticsFlinkConfig.java        |  80 ---------------
 .../stat/summary/StatisticsSummaryController.java  |   9 +-
 .../stat/summary/StatisticsSummaryProgram.java     |  12 +--
 .../window/StatisticsSummaryControllerWindow.java  |   3 +-
 .../window/StatisticsSummaryProgramWindow.java     |  17 ++--
 .../flink/AbstractTextMiningProgram.java           |  26 +++--
 .../textmining/flink/TextMiningFlinkInit.java      |   5 +
 .../textmining/flink/config/ConfigKeys.java        |  10 +-
 .../flink/config/TextMiningFlinkConfig.java        |  81 ----------------
 .../language/LanguageDetectionController.java      |   3 +-
 .../language/LanguageDetectionProgram.java         |  12 +--
 .../processor/wordcount/WordCountController.java   |   3 +-
 .../processor/wordcount/WordCountProgram.java      |  12 +--
 .../flink/AbstractFlinkTransformationProgram.java  |  26 +++--
 .../flink/TransformationFlinkInit.java             |  53 ++++++----
 .../transformation/flink/config/ConfigKeys.java    |  12 +--
 .../flink/config/TransformationFlinkConfig.java    |  80 ---------------
 .../boilerplate/BoilerplateController.java         |   3 +-
 .../processor/boilerplate/BoilerplateProgram.java  |  12 +--
 .../converter/FieldConverterController.java        |   9 +-
 .../processor/converter/FieldConverterProgram.java |  12 +--
 .../processor/hasher/FieldHasherController.java    |   8 +-
 .../flink/processor/hasher/FieldHasherProgram.java |  12 +--
 .../processor/mapper/FieldMapperController.java    |   2 +-
 .../flink/processor/mapper/FieldMapperProgram.java |  12 +--
 .../MeasurementUnitConverterController.java        |   2 +-
 .../MeasurementUnitConverterProgram.java           |  12 +--
 .../processor/rename/FieldRenamerController.java   |   8 +-
 .../processor/rename/FieldRenamerProgram.java      |  12 +--
 .../processor/converter/TestConverterProgram.java  |   9 +-
 .../processor/hasher/TestFieldHasherProgram.java   |  19 ++--
 .../flink/processor/rename/TestRenameProgram.java  |   9 +-
 .../sinks/databases/flink/DatabasesFlinkInit.java  |  44 ++++++---
 .../sinks/databases/flink/config/ConfigKeys.java   |  15 ++-
 .../flink/config/DatabasesFlinkConfig.java         |  95 ------------------
 .../elasticsearch/ElasticSearchController.java     |   8 +-
 .../elasticsearch/ElasticSearchParameters.java     |  18 +++-
 .../flink/elasticsearch/ElasticSearchProgram.java  |  34 ++++---
 .../svcdiscovery/consul/ConsulSpConfig.java        |   1 +
 .../distributed/runtime/DistributedRuntime.java    |  11 ++-
 .../wrapper/flink/FlinkDataProcessorRuntime.java   |  31 +++---
 .../wrapper/flink/FlinkDataSinkRuntime.java        |  31 +++---
 .../wrapper/flink/FlinkDeploymentConfig.java       |  17 +++-
 .../flink/FlinkMiniClusterDeploymentConfig.java    |   8 ++
 .../streampipes/wrapper/flink/FlinkRuntime.java    |  45 ++-------
 .../runtime/EventProcessorRuntimeParams.java       |   4 +-
 111 files changed, 798 insertions(+), 1433 deletions(-)
 delete mode 100644 streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
 delete mode 100644 streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
 delete mode 100644 streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/AggregationFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/EnricherFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/GeoFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/PatternDetectionFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/StatisticsFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/TextMiningFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/TransformationFlinkConfig.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/DatabasesFlinkConfig.java
 create mode 100644 streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java

[incubator-streampipes] 03/04: [STREAMPIPES-487] Refactor Flink-based pipeline elements

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 8f96f57ac07def464d5a2c4ae47b6211f7609420
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Dec 28 10:59:23 2021 +0100

    [STREAMPIPES-487] Refactor Flink-based pipeline elements
---
 .../streampipes/client/StreamPipesClient.java      |  5 +-
 .../container/config/ConfigExtractor.java          |  4 +-
 .../apache/streampipes/pe/flink/config/Config.java | 76 -----------------
 .../streampipes/pe/flink/config/ConfigKeys.java    | 27 ------
 .../flink/AbstractAggregationProgram.java          | 26 +++---
 .../aggregation/flink/AggregationFlinkInit.java    |  9 +-
 .../flink/config/AggregationFlinkConfig.java       | 81 ------------------
 .../aggregation/flink/config/ConfigKeys.java       | 10 +--
 .../aggregation/AggregationController.java         |  6 +-
 .../processor/aggregation/AggregationProgram.java  |  8 +-
 .../flink/processor/count/CountController.java     | 11 +--
 .../flink/processor/count/CountProgram.java        |  8 +-
 .../processor/eventcount/EventCountController.java | 11 +--
 .../processor/eventcount/EventCountProgram.java    |  8 +-
 .../flink/processor/rate/EventRateController.java  |  9 +-
 .../flink/processor/rate/EventRateProgram.java     |  8 +-
 .../aggregation/TestTimeAggregationProgram.java    |  9 +-
 .../flink/processor/count/TestCountProgram.java    | 10 ++-
 .../flink/processor/rate/TestRateProgram.java      |  9 +-
 .../enricher/flink/AbstractEnricherProgram.java    | 26 +++---
 .../enricher/flink/EnricherFlinkInit.java          |  5 ++
 .../enricher/flink/config/ConfigKeys.java          | 10 +--
 .../enricher/flink/config/EnricherFlinkConfig.java | 80 ------------------
 .../processor/math/mathop/MathOpController.java    | 17 +---
 .../flink/processor/math/mathop/MathOpProgram.java |  8 +-
 .../math/staticmathop/StaticMathOpController.java  | 16 +---
 .../math/staticmathop/StaticMathOpProgram.java     |  8 +-
 .../processor/timestamp/TimestampController.java   |  9 +-
 .../processor/timestamp/TimestampProgram.java      |  8 +-
 .../trigonometry/TrigonometryController.java       |  3 +-
 .../trigonometry/TrigonometryProgram.java          |  8 +-
 .../UrlDereferencingController.java                |  3 +-
 .../urldereferencing/UrlDereferencingProgram.java  |  8 +-
 .../processor/geo/flink/AbstractGeoProgram.java    | 27 +++---
 .../processor/geo/flink/GeoFlinkInit.java          |  5 ++
 .../processor/geo/flink/config/ConfigKeys.java     | 10 +--
 .../processor/geo/flink/config/GeoFlinkConfig.java | 79 ------------------
 .../SpatialGridEnrichmentController.java           |  3 +-
 .../gridenricher/SpatialGridEnrichmentProgram.java |  8 +-
 .../flink/AbstractPatternDetectionProgram.java     | 26 +++---
 .../detection/flink/PatternDetectionFlinkInit.java |  9 +-
 .../pattern/detection/flink/config/ConfigKeys.java | 10 +--
 .../flink/config/PatternDetectionFlinkConfig.java  | 80 ------------------
 .../flink/processor/absence/AbsenceController.java |  3 +-
 .../flink/processor/absence/AbsenceProgram.java    |  8 +-
 .../flink/processor/and/AndController.java         |  3 +-
 .../detection/flink/processor/and/AndProgram.java  |  8 +-
 .../processor/peak/PeakDetectionController.java    |  6 +-
 .../flink/processor/peak/PeakDetectionProgram.java |  8 +-
 .../processor/sequence/SequenceController.java     |  3 +-
 .../flink/processor/sequence/SequenceProgram.java  |  8 +-
 .../detection/processor/absence/TestAbsence.java   | 17 ++--
 .../pattern/detection/processor/and/TestAnd.java   | 17 ++--
 .../flink/AbstractStatisticsProgram.java           | 26 +++---
 .../statistics/flink/StatisticsFlinkInit.java      |  5 ++
 .../statistics/flink/config/ConfigKeys.java        | 10 +--
 .../flink/config/StatisticsFlinkConfig.java        | 80 ------------------
 .../stat/summary/StatisticsSummaryController.java  |  9 +-
 .../stat/summary/StatisticsSummaryProgram.java     | 12 +--
 .../window/StatisticsSummaryControllerWindow.java  |  3 +-
 .../window/StatisticsSummaryProgramWindow.java     | 17 ++--
 .../flink/AbstractTextMiningProgram.java           | 26 +++---
 .../textmining/flink/TextMiningFlinkInit.java      |  5 ++
 .../textmining/flink/config/ConfigKeys.java        | 10 +--
 .../flink/config/TextMiningFlinkConfig.java        | 81 ------------------
 .../language/LanguageDetectionController.java      |  3 +-
 .../language/LanguageDetectionProgram.java         | 12 +--
 .../processor/wordcount/WordCountController.java   |  3 +-
 .../processor/wordcount/WordCountProgram.java      | 12 +--
 .../flink/AbstractFlinkTransformationProgram.java  | 26 +++---
 .../flink/TransformationFlinkInit.java             |  8 +-
 .../transformation/flink/config/ConfigKeys.java    | 12 +--
 .../flink/config/TransformationFlinkConfig.java    | 80 ------------------
 .../boilerplate/BoilerplateController.java         |  3 +-
 .../processor/boilerplate/BoilerplateProgram.java  | 12 +--
 .../converter/FieldConverterController.java        |  9 +-
 .../processor/converter/FieldConverterProgram.java | 12 +--
 .../processor/hasher/FieldHasherController.java    |  8 +-
 .../flink/processor/hasher/FieldHasherProgram.java | 12 +--
 .../processor/mapper/FieldMapperController.java    |  2 +-
 .../flink/processor/mapper/FieldMapperProgram.java | 12 +--
 .../MeasurementUnitConverterController.java        |  2 +-
 .../MeasurementUnitConverterProgram.java           | 12 +--
 .../processor/rename/FieldRenamerController.java   |  8 +-
 .../processor/rename/FieldRenamerProgram.java      | 12 +--
 .../processor/converter/TestConverterProgram.java  |  9 +-
 .../processor/hasher/TestFieldHasherProgram.java   | 19 ++---
 .../flink/processor/rename/TestRenameProgram.java  |  9 +-
 .../sinks/databases/flink/DatabasesFlinkInit.java  |  7 ++
 .../sinks/databases/flink/config/ConfigKeys.java   | 15 ++--
 .../flink/config/DatabasesFlinkConfig.java         | 95 ----------------------
 .../elasticsearch/ElasticSearchController.java     |  8 +-
 .../elasticsearch/ElasticSearchParameters.java     | 18 +++-
 .../flink/elasticsearch/ElasticSearchProgram.java  | 34 +++++---
 .../svcdiscovery/consul/ConsulSpConfig.java        |  1 +
 .../distributed/runtime/DistributedRuntime.java    | 11 ++-
 .../wrapper/flink/FlinkDataProcessorRuntime.java   | 31 +++----
 .../wrapper/flink/FlinkDataSinkRuntime.java        | 31 +++----
 .../wrapper/flink/FlinkDeploymentConfig.java       | 17 +++-
 .../flink/FlinkMiniClusterDeploymentConfig.java    |  8 ++
 .../streampipes/wrapper/flink/FlinkRuntime.java    | 45 +++-------
 .../runtime/EventProcessorRuntimeParams.java       |  4 +-
 102 files changed, 560 insertions(+), 1228 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 87ba54c..9dd2da1 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -29,11 +29,14 @@ import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
 import org.apache.streampipes.model.mail.SpEmail;
 
+import java.io.Serializable;
+
 public class StreamPipesClient implements SupportsPipelineApi,
         SupportsPipelineElementTemplateApi,
         SupportsDataSinkApi,
         SupportsDataStreamApi,
-        SupportsDataProcessorApi {
+        SupportsDataProcessorApi,
+        Serializable {
 
   private static final Integer SP_DEFAULT_PORT = 80;
 
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/config/ConfigExtractor.java b/streampipes-container/src/main/java/org/apache/streampipes/container/config/ConfigExtractor.java
index e5d9bd1..93d050c 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/config/ConfigExtractor.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/config/ConfigExtractor.java
@@ -20,7 +20,9 @@ package org.apache.streampipes.container.config;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.svcdiscovery.consul.ConsulSpConfig;
 
-public class ConfigExtractor {
+import java.io.Serializable;
+
+public class ConfigExtractor implements Serializable {
 
   private SpConfig config;
 
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
deleted file mode 100644
index e807572..0000000
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum Config implements PeConfig {
-  INSTANCE;
-
-  private final SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-  private final static String SERVICE_ID = "pe/org.apache.streampipes.processors.all.flink";
-  private final static String SERVICE_NAME = "Processors Flink (Bundle)";
-  private final static String SERVICE_CONTAINER_NAME = "pipeline-elements-all-flink";
-
-  Config() {
-    config = SpServiceDiscovery.getSpConfig(SERVICE_ID);
-    config.register(ConfigKeys.HOST, SERVICE_CONTAINER_NAME, "Data processor host");
-    config.register(ConfigKeys.PORT, 8090, "Data processor port");
-    config.register(ConfigKeys.SERVICE_NAME, SERVICE_NAME, "Data processor service name");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
-    config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getFlinkDebug() {
-    return config.getBoolean(ConfigKeys.FLINK_DEBUG);
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  @Override
-  public String getId() {
-    return SERVICE_ID;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-
-
-}
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
deleted file mode 100644
index cacc5ec..0000000
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.flink.config;
-
-public class ConfigKeys {
-  final static String HOST = "SP_HOST";
-  final static String PORT = "SP_PORT";
-  final static String SERVICE_NAME = "SP_SERVICE_NAME";
-  final static String FLINK_HOST = "SP_FLINK_HOST";
-  final static String FLINK_PORT = "SP_FLINK_PORT";
-  final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
-}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AbstractAggregationProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AbstractAggregationProgram.java
index 02706c3..c4c7760 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AbstractAggregationProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AbstractAggregationProgram.java
@@ -17,25 +17,31 @@
  */
 package org.apache.streampipes.processors.aggregation.flink;
 
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.aggregation.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractAggregationProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractAggregationProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractAggregationProgram(B params) {
-    super(params, false);
+  public AbstractAggregationProgram(B params,
+                                    ConfigExtractor configExtractor,
+                                    StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(AggregationFlinkConfig.JAR_FILE,
-            AggregationFlinkConfig.INSTANCE.getFlinkHost(), AggregationFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
index f276485..18228fa 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.aggregation.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
 import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
 import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
@@ -35,13 +36,15 @@ import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventR
 
 public class AggregationFlinkInit extends StandaloneModelSubmitter {
 
+  public static final String ServiceGroup = "org.apache.streampipes.processors.aggregation.flink";
+
   public static void main(String[] args) {
     new AggregationFlinkInit().init();
   }
 
   @Override
   public SpServiceDefinition provideServiceDefinition() {
-    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.aggregation.flink",
+    return SpServiceDefinitionBuilder.create(ServiceGroup,
                     "Processors Aggregation Flink",
                     "",
                     8090)
@@ -58,6 +61,10 @@ public class AggregationFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/AggregationFlinkConfig.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/AggregationFlinkConfig.java
deleted file mode 100644
index c925418..0000000
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/AggregationFlinkConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.aggregation.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum AggregationFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.aggregation.flink";
-  private final static String service_name = "Processors TimeAggregation Flink";
-  private final static String service_container_name = "processors-aggregation-flink";
-
-
-  AggregationFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/ConfigKeys.java
index a6522d0..8bd9cb7 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processors.aggregation.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
index d8baeca..945f7c2 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
@@ -30,7 +30,6 @@ import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
@@ -132,11 +131,12 @@ public class AggregationController extends FlinkDataProcessorDeclarer<Aggregatio
             selectProperties,
             timeCountWindow.equals(TIME_WINDOW_OPTION));
 
-    return new AggregationProgram(staticParam, AggregationFlinkConfig.INSTANCE.getDebug());
+    return new AggregationProgram(staticParam, configExtractor, streamPipesClient);
   }
 
   @Override
-  public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
+  public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement,
+                                           ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
 
     EventSchema eventSchema = processingElement.getInputStreams().get(0).getEventSchema();
 
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationProgram.java
index 8da997b..37d470f 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationProgram.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.aggregation.flink.AbstractAggregationProgram;
 
@@ -33,8 +35,10 @@ import java.util.Map;
 
 public class AggregationProgram extends AbstractAggregationProgram<AggregationParameters> {
 
-  public AggregationProgram(AggregationParameters params, boolean debug) {
-    super(params, debug);
+  public AggregationProgram(AggregationParameters params,
+                            ConfigExtractor configExtractor,
+                            StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
     setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
   }
 
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountController.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountController.java
index 1e757b1..1e78def 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountController.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountController.java
@@ -24,17 +24,10 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -85,7 +78,7 @@ public class CountController extends FlinkDataProcessorDeclarer<CountParameters>
     CountParameters staticParam = new CountParameters(graph, timeWindowSize, scale,
             fieldToCount);
 
-    return new CountProgram(staticParam, AggregationFlinkConfig.INSTANCE.getDebug());
+    return new CountProgram(staticParam, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountProgram.java
index 6193d1b..6e6a1f7 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/CountProgram.java
@@ -20,13 +20,17 @@ package org.apache.streampipes.processors.aggregation.flink.processor.count;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.aggregation.flink.AbstractAggregationProgram;
 
 public class CountProgram extends AbstractAggregationProgram<CountParameters> {
 
-  public CountProgram(CountParameters params, boolean debug) {
-    super(params, debug);
+  public CountProgram(CountParameters params,
+                      ConfigExtractor configExtractor,
+                      StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
     setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
   }
 
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountController.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountController.java
index b5f3e60..161321e 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountController.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountController.java
@@ -22,17 +22,10 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -77,6 +70,6 @@ public class EventCountController extends FlinkDataProcessorDeclarer<EventCountP
     String scale = extractor.selectedSingleValueInternalName(SCALE_KEY, String.class);
     EventCountParameters staticParam = new EventCountParameters(graph, timeWindowSize, scale);
 
-    return new EventCountProgram(staticParam, AggregationFlinkConfig.INSTANCE.getDebug());
+    return new EventCountProgram(staticParam, configExtractor, streamPipesClient);
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountProgram.java
index 1b85ec5..e7b80c6 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/eventcount/EventCountProgram.java
@@ -18,14 +18,18 @@
 package org.apache.streampipes.processors.aggregation.flink.processor.eventcount;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.aggregation.flink.AbstractAggregationProgram;
 import org.apache.streampipes.processors.aggregation.flink.processor.count.TimeWindowConverter;
 
 public class EventCountProgram extends AbstractAggregationProgram<EventCountParameters> {
 
-  public EventCountProgram(EventCountParameters params, boolean debug) {
-    super(params, debug);
+  public EventCountProgram(EventCountParameters params,
+                           ConfigExtractor configExtractor,
+                           StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateController.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateController.java
index d58d76c..f00c130 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateController.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateController.java
@@ -23,15 +23,10 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -66,7 +61,7 @@ public class EventRateController extends FlinkDataProcessorDeclarer<EventRatePar
 
     EventRateParameter staticParam = new EventRateParameter(graph, avgRate);
 
-    return new EventRateProgram(staticParam, AggregationFlinkConfig.INSTANCE.getDebug());
+    return new EventRateProgram(staticParam, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateProgram.java
index 7f89fca..eb66949 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/EventRateProgram.java
@@ -22,13 +22,17 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.aggregation.flink.AbstractAggregationProgram;
 
 public class EventRateProgram extends AbstractAggregationProgram<EventRateParameter> {
 
-  public EventRateProgram(EventRateParameter params, boolean debug) {
-    super(params, debug);
+  public EventRateProgram(EventRateParameter params,
+                          ConfigExtractor configExtractor,
+                          StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
     setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
   }
 
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java
index a17b7f7..34df33a 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java
@@ -22,10 +22,12 @@ import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
 import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.junit.Ignore;
+import org.junit.Test;
 
 import java.util.Arrays;
 
@@ -46,7 +48,8 @@ public class TestTimeAggregationProgram extends DataStreamTestBase {
   @Test
   public void testAggregationProgram() {
     AggregationParameters params = makeParams();
-    AggregationProgram program = new AggregationProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.ServiceGroup);
+    AggregationProgram program = new AggregationProgram(params, configExtractor, null);
     AggregationTestData testData = new AggregationTestData();
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java
index fc3e42d..9df9f98 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java
@@ -22,10 +22,12 @@ import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
 import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.junit.Ignore;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,8 +62,8 @@ public class TestCountProgram extends DataStreamTestBase {
           expected) {
     CountParameters params =
             new CountParameters(InvocationGraphGenerator.makeEmptyInvocation(new CountController().declareModel()), 10,"SECONDS", "field");
-
-    CountProgram program = new CountProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.ServiceGroup);
+    CountProgram program = new CountProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(input));
 
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java
index 99425ac..794b319 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java
@@ -22,12 +22,14 @@ import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -70,8 +72,9 @@ public class TestRateProgram extends DataStreamTestBase {
   @Test
   public void testRateProgram() {
     EventRateParameter params = new EventRateParameter(InvocationGraphGenerator.makeEmptyInvocation(new EventRateController().declareModel()), timeWindowSize);
+    ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.ServiceGroup);
 
-    EventRateProgram program = new EventRateProgram(params, true);
+    EventRateProgram program = new EventRateProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData
             (numEvents, waitTime, timeUnit)));
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/AbstractEnricherProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/AbstractEnricherProgram.java
index 9a55db7..cdb004b 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/AbstractEnricherProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/AbstractEnricherProgram.java
@@ -17,25 +17,31 @@
  */
 package org.apache.streampipes.processors.enricher.flink;
 
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.enricher.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractEnricherProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractEnricherProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractEnricherProgram(B params) {
-    super(params, false);
+  public AbstractEnricherProgram(B params,
+                                 ConfigExtractor configExtractor,
+                                 StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(EnricherFlinkConfig.JAR_FILE,
-            EnricherFlinkConfig.INSTANCE.getFlinkHost(), EnricherFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
index eb6b46a..2d36358a 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.enricher.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
@@ -60,6 +61,10 @@ public class EnricherFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/ConfigKeys.java
index 81d4200..52f5bc3 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processors.enricher.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/EnricherFlinkConfig.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/EnricherFlinkConfig.java
deleted file mode 100644
index a1b3bd5..0000000
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/config/EnricherFlinkConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.enricher.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum EnricherFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.enricher.flink";
-  private final static String service_name = "Processors Enricher Flink";
-  private final static String service_container_name = "processors-enricher-flink";
-
-  EnricherFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpController.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpController.java
index 290d50b..f5c4f2e 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpController.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpController.java
@@ -24,22 +24,11 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.Operation;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationAddition;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationDivide;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationModulo;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationMultiply;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationSubtracting;
+import org.apache.streampipes.processors.enricher.flink.processor.math.operation.*;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
@@ -104,6 +93,6 @@ public class MathOpController extends FlinkDataProcessorDeclarer<MathOpParameter
 
     MathOpParameters parameters = new MathOpParameters(graph, arithmeticOperation, leftOperand, rightOperand, RESULT_FIELD);
 
-    return new MathOpProgram(parameters, EnricherFlinkConfig.INSTANCE.getDebug());
+    return new MathOpProgram(parameters, configExtractor, streamPipesClient);
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpProgram.java
index dc8412b..bb61a3e 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/mathop/MathOpProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processors.enricher.flink.processor.math.mathop;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.enricher.flink.AbstractEnricherProgram;
 
 public class MathOpProgram extends AbstractEnricherProgram<MathOpParameters> {
 
-    public MathOpProgram(MathOpParameters params, boolean debug) {
-        super(params, debug);
+    public MathOpProgram(MathOpParameters params,
+                         ConfigExtractor configExtractor,
+                         StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpController.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpController.java
index ac35293..6cf6bbd 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpController.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpController.java
@@ -24,21 +24,11 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.Operation;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationAddition;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationDivide;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationModulo;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationMultiply;
-import org.apache.streampipes.processors.enricher.flink.processor.math.operation.OperationSubtracting;
+import org.apache.streampipes.processors.enricher.flink.processor.math.operation.*;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -99,7 +89,7 @@ public class StaticMathOpController extends FlinkDataProcessorDeclarer<StaticMat
 
     StaticMathOpParameters parameters = new StaticMathOpParameters(graph, arithmeticOperation, leftOperand, rightOperand, RESULT_FIELD);
 
-    return new StaticMathOpProgram(parameters, EnricherFlinkConfig.INSTANCE.getDebug());
+    return new StaticMathOpProgram(parameters, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpProgram.java
index 326d3c2..57c0dcf 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/math/staticmathop/StaticMathOpProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.enricher.flink.AbstractEnricherProgram;
 
 public class StaticMathOpProgram extends AbstractEnricherProgram<StaticMathOpParameters> {
 
-    public StaticMathOpProgram(StaticMathOpParameters params, boolean debug) {
-        super(params, debug);
+    public StaticMathOpProgram(StaticMathOpParameters params,
+                               ConfigExtractor configExtractor,
+                               StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampController.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampController.java
index 2dbb795..5911f01 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampController.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampController.java
@@ -22,15 +22,10 @@ import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
@@ -65,7 +60,7 @@ public class TimestampController extends FlinkDataProcessorDeclarer<TimestampPar
             APPEND_PROPERTY);
 
 
-    return new TimestampProgram(staticParam, EnricherFlinkConfig.INSTANCE.getDebug());
+    return new TimestampProgram(staticParam, configExtractor, streamPipesClient);
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampProgram.java
index b7795b8..6c384ed 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/timestamp/TimestampProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processors.enricher.flink.processor.timestamp;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.enricher.flink.AbstractEnricherProgram;
 
 public class TimestampProgram extends AbstractEnricherProgram<TimestampParameters> {
 
-	public TimestampProgram(TimestampParameters params, boolean debug) {
-		super(params, debug);
+	public TimestampProgram(TimestampParameters params,
+							ConfigExtractor configExtractor,
+							StreamPipesClient streamPipesClient) {
+		super(params, configExtractor, streamPipesClient);
 	}
 
 	@Override
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryController.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryController.java
index 417302c..29735c8 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryController.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -82,6 +81,6 @@ public class TrigonometryController extends FlinkDataProcessorDeclarer<Trigonome
 
         TrigonometryParameters parameters = new TrigonometryParameters(graph, operand, trigonometryFunction, RESULT_FIELD);
 
-        return new TrigonometryProgram(parameters, EnricherFlinkConfig.INSTANCE.getDebug());
+        return new TrigonometryProgram(parameters, configExtractor, streamPipesClient);
     }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryProgram.java
index 6761a30..ceb2d83 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/trigonometry/TrigonometryProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processors.enricher.flink.processor.trigonometry;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.enricher.flink.AbstractEnricherProgram;
 
 public class TrigonometryProgram extends AbstractEnricherProgram<TrigonometryParameters> {
 
-    public TrigonometryProgram(TrigonometryParameters params, boolean debug) {
-        super(params, debug);
+    public TrigonometryProgram(TrigonometryParameters params,
+                               ConfigExtractor configExtractor,
+                               StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingController.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingController.java
index a27aa60..6ef9444 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingController.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -75,6 +74,6 @@ public class UrlDereferencingController extends FlinkDataProcessorDeclarer<UrlDe
 */
         UrlDereferencingParameter staticParam = new UrlDereferencingParameter(graph, urlString, APPEND_HTML);
 
-        return  new UrlDereferencingProgram(staticParam, EnricherFlinkConfig.INSTANCE.getDebug());
+        return  new UrlDereferencingProgram(staticParam, configExtractor, streamPipesClient);
     }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingProgram.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingProgram.java
index 8e94a59..adae9fd 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingProgram.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/processor/urldereferencing/UrlDereferencingProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processors.enricher.flink.processor.urldereferencing;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.enricher.flink.AbstractEnricherProgram;
 
 public class UrlDereferencingProgram extends AbstractEnricherProgram<UrlDereferencingParameter> {
 
-    public UrlDereferencingProgram(UrlDereferencingParameter params, boolean debug) {
-        super(params, debug);
+    public UrlDereferencingProgram(UrlDereferencingParameter params,
+                                   ConfigExtractor configExtractor,
+                                   StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/AbstractGeoProgram.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/AbstractGeoProgram.java
index 41e8c67..6f54221 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/AbstractGeoProgram.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/AbstractGeoProgram.java
@@ -17,26 +17,31 @@
  */
 package org.apache.streampipes.processor.geo.flink;
 
-import org.apache.streampipes.processor.geo.flink.config.GeoFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processor.geo.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractGeoProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractGeoProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractGeoProgram(B params) {
-    super(params, false);
+  public AbstractGeoProgram(B params,
+                            ConfigExtractor configExtractor,
+                            StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(GeoFlinkConfig.JAR_FILE,
-            GeoFlinkConfig.INSTANCE.getFlinkHost(), GeoFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
-
 }
 
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
index 3e5a8ed..4759969 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processor.geo.flink.config.ConfigKeys;
 import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
 
 public class GeoFlinkInit extends StandaloneModelSubmitter {
@@ -52,6 +53,10 @@ public class GeoFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/ConfigKeys.java
index 9e6e713..91c1a1c 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processor.geo.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/GeoFlinkConfig.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/GeoFlinkConfig.java
deleted file mode 100644
index 498bfb1..0000000
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/config/GeoFlinkConfig.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processor.geo.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum GeoFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.geo.flink";
-  private final static String service_name = "Processors Geo Flink";
-  private final static String service_container_name = "processors-geo-flink";
-  GeoFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the geo flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the geo flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentController.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentController.java
index d2e8948..eecdcf7 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentController.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentController.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processor.geo.flink.config.GeoFlinkConfig;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
@@ -96,7 +95,7 @@ public class SpatialGridEnrichmentController extends FlinkDataProcessorDeclarer<
     SpatialGridEnrichmentParameters params = new SpatialGridEnrichmentParameters(graph,
             enrichmentSettings);
 
-    return new SpatialGridEnrichmentProgram(params, GeoFlinkConfig.INSTANCE.getDebug());
+    return new SpatialGridEnrichmentProgram(params, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentProgram.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentProgram.java
index 9a15fb1..8aee507 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentProgram.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/processor/gridenricher/SpatialGridEnrichmentProgram.java
@@ -19,13 +19,17 @@
 package org.apache.streampipes.processor.geo.flink.processor.gridenricher;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processor.geo.flink.AbstractGeoProgram;
 
 public class SpatialGridEnrichmentProgram extends AbstractGeoProgram<SpatialGridEnrichmentParameters> {
 
-  public SpatialGridEnrichmentProgram(SpatialGridEnrichmentParameters params, boolean debug) {
-    super(params, debug);
+  public SpatialGridEnrichmentProgram(SpatialGridEnrichmentParameters params,
+                                      ConfigExtractor configExtractor,
+                                      StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java
index 7711610..e02200c 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java
@@ -19,25 +19,31 @@ package org.apache.streampipes.processors.pattern.detection.flink;
 
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.pattern.detection.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractPatternDetectionProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractPatternDetectionProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractPatternDetectionProgram(B params) {
-    super(params, false);
+  public AbstractPatternDetectionProgram(B params,
+                                         ConfigExtractor configExtractor,
+                                         StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(PatternDetectionFlinkConfig.JAR_FILE,
-            PatternDetectionFlinkConfig.INSTANCE.getFlinkHost(), PatternDetectionFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
index 1fbb189..c78b186 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.pattern.detection.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
@@ -35,13 +36,15 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.seque
 
 public class PatternDetectionFlinkInit extends StandaloneModelSubmitter {
 
+  public static final String ServiceGroup = "org.apache.streampipes.processors.patterndetection.flink";
+
   public static void main(String[] args) {
     new PatternDetectionFlinkInit().init();
   }
 
   @Override
   public SpServiceDefinition provideServiceDefinition() {
-    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.patterndetection.flink",
+    return SpServiceDefinitionBuilder.create(ServiceGroup,
                     "Processors Pattern Detection Flink",
                     "",
                     8090)
@@ -58,6 +61,10 @@ public class PatternDetectionFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java
index e2ea4c2..dc6f471 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processors.pattern.detection.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/PatternDetectionFlinkConfig.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/PatternDetectionFlinkConfig.java
deleted file mode 100644
index 47ffb22..0000000
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/PatternDetectionFlinkConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.pattern.detection.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum PatternDetectionFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.pattern.detection.flink";
-  private final static String service_name = "Processors Pattern Detection Flink";
-  private final static String service_container_name = "processors-pattern-detection-flink";
-
-  PatternDetectionFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java
index 3815c1d..2119071 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
@@ -78,6 +77,6 @@ public class AbsenceController extends FlinkDataProcessorDeclarer<AbsenceParamet
 
     AbsenceParameters params = new AbsenceParameters(graph, selectProperties, timeWindow, timeUnit);
 
-    return new AbsenceProgram(params, PatternDetectionFlinkConfig.INSTANCE.getDebug());
+    return new AbsenceProgram(params, configExtractor, streamPipesClient);
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java
index 93148bd..e3ece2d 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnitConverter;
@@ -39,8 +41,10 @@ import java.util.Map;
 
 public class AbsenceProgram extends AbstractPatternDetectionProgram<AbsenceParameters> {
 
-  public AbsenceProgram(AbsenceParameters params, boolean debug) {
-    super(params, debug);
+  public AbsenceProgram(AbsenceParameters params,
+                        ConfigExtractor configExtractor,
+                        StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java
index 4aa67d5..28c2d68 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -79,7 +78,7 @@ public class AndController extends FlinkDataProcessorDeclarer<AndParameters> {
     Integer timeWindow = extractor.singleValueParameter(TIME_WINDOW, Integer.class);
 
     AndParameters params = new AndParameters(graph, timeUnit, timeWindow, leftMappings, rightMappings);
-    return new AndProgram(params, PatternDetectionFlinkConfig.INSTANCE.getDebug());
+    return new AndProgram(params, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java
index d7ae7ce..547bb78 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram;
 
@@ -30,8 +32,10 @@ import java.util.List;
 
 public class AndProgram extends AbstractPatternDetectionProgram<AndParameters> {
 
-  public AndProgram(AndParameters params, boolean debug) {
-    super(params, debug);
+  public AndProgram(AndParameters params,
+                    ConfigExtractor configExtractor,
+                    StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java
index e65185f..647e893 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -33,9 +32,6 @@ import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 
-/**
- * Created by riemer on 20.04.2017.
- */
 public class PeakDetectionController extends FlinkDataProcessorDeclarer<PeakDetectionParameters> {
 
   private static final String VALUE_TO_OBSERVE = "value-to-observe";
@@ -90,6 +86,6 @@ public class PeakDetectionController extends FlinkDataProcessorDeclarer<PeakDete
     PeakDetectionParameters params = new PeakDetectionParameters(sepa,
             valueToObserve, timestampMapping, groupBy, countWindowSize, lag, threshold, influence);
 
-    return new PeakDetectionProgram(params, PatternDetectionFlinkConfig.INSTANCE.getDebug());
+    return new PeakDetectionProgram(params, configExtractor, streamPipesClient);
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java
index d5f893d..db83eae 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.utils.SlidingBatchWindow;
@@ -33,8 +35,10 @@ import java.util.List;
  */
 public class PeakDetectionProgram extends AbstractPatternDetectionProgram<PeakDetectionParameters> {
 
-  public PeakDetectionProgram(PeakDetectionParameters params, boolean debug) {
-    super(params, debug);
+  public PeakDetectionProgram(PeakDetectionParameters params,
+                              ConfigExtractor configExtractor,
+                              StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java
index 4387473..911d830 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -63,7 +62,7 @@ public class SequenceController extends FlinkDataProcessorDeclarer<SequenceParam
 
     SequenceParameters params = new SequenceParameters(graph, timeWindowSize, timeUnit);
 
-    return new SequenceProgram(params, PatternDetectionFlinkConfig.INSTANCE.getDebug());
+    return new SequenceProgram(params, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceProgram.java
index b52e0ad..05207b8 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceProgram.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceProgram.java
@@ -20,13 +20,17 @@ package org.apache.streampipes.processors.pattern.detection.flink.processor.sequ
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram;
 
 public class SequenceProgram extends AbstractPatternDetectionProgram<SequenceParameters> {
 
-  public SequenceProgram(SequenceParameters params, boolean debug) {
-    super(params, debug);
+  public SequenceProgram(SequenceParameters params,
+                         ConfigExtractor configExtractor,
+                         StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java
index 0397e94..2a485ab 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java
@@ -17,28 +17,30 @@
  */
 package org.apache.streampipes.processors.pattern.detection.processor.absence;
 
-import static org.hamcrest.core.IsEqual.equalTo;
-
 import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceParameters;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceProgram;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit;
 import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import static org.hamcrest.core.IsEqual.equalTo;
+
 @Ignore
 @RunWith(Parameterized.class)
 public class TestAbsence extends DataStreamTestBase {
@@ -75,7 +77,8 @@ public class TestAbsence extends DataStreamTestBase {
   public void testAbsenceProgram() {
     AbsenceParameters params = new AbsenceParameters(InvocationGraphGenerator.makeEmptyInvocation(new AbsenceController().declareModel()),  Arrays.asList("id", "timestamp", "value"), timeWindow, timeUnit);
 
-    AbsenceProgram program = new AbsenceProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.ServiceGroup);
+    AbsenceProgram program = new AbsenceProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData(1, makeMap(), 0)), createTestStream(makeInputData(waitForMs, makeMap(), 1)));
 
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java
index d264640..c5402e2 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java
@@ -17,30 +17,32 @@
  */
 package org.apache.streampipes.processors.pattern.detection.processor.and;
 
-import static org.hamcrest.core.IsEqual.equalTo;
-
 import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndParameters;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndProgram;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit;
 import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import static org.hamcrest.core.IsEqual.equalTo;
+
 @Ignore
 @RunWith(Parameterized.class)
 public class TestAnd extends DataStreamTestBase {
@@ -88,7 +90,8 @@ public class TestAnd extends DataStreamTestBase {
             new AndParameters(InvocationGraphGenerator.makeEmptyInvocation(description), timeUnit,
                     timeWindow, leftMapping, rightMapping);
 
-    AndProgram program = new AndProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.ServiceGroup);
+    AndProgram program = new AndProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData(delayFirstEvent, makeMap("field1"))), createTestStream(makeInputData(delaySecondEvent, makeMap("field2"))));
 
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java
index 486ee4d..a3d8174 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java
@@ -17,25 +17,31 @@
  */
 package org.apache.streampipes.processors.statistics.flink;
 
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.statistics.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractStatisticsProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractStatisticsProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractStatisticsProgram(B params) {
-    super(params, false);
+  public AbstractStatisticsProgram(B params,
+                                   ConfigExtractor configExtractor,
+                                   StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(StatisticsFlinkConfig.JAR_FILE,
-            StatisticsFlinkConfig.INSTANCE.getFlinkHost(), StatisticsFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
index 8ac42b8..6bb4b84 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.statistics.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
 
@@ -54,6 +55,10 @@ public class StatisticsFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java
index b2db189..c090a00 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processors.statistics.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/StatisticsFlinkConfig.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/StatisticsFlinkConfig.java
deleted file mode 100644
index d240b4e..0000000
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/StatisticsFlinkConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.statistics.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum StatisticsFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.statistics.flink";
-  private final static String service_name = "Statistics Processors Flink";
-  private final static String service_container_name = "processors-statistics-flink";
-
-  StatisticsFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java
index b053cea..6a8e675 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java
@@ -27,15 +27,10 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Statistics;
@@ -81,7 +76,7 @@ public class StatisticsSummaryController extends FlinkDataProcessorDeclarer<Stat
 
     StatisticsSummaryParameters params = new StatisticsSummaryParameters(graph, listPropertyMappings);
 
-    return new StatisticsSummaryProgram(params, StatisticsFlinkConfig.INSTANCE.getDebug());
+    return new StatisticsSummaryProgram(params, configExtractor, streamPipesClient);
 
   }
 
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java
index 7129bda..0c0edc5 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java
@@ -19,17 +19,17 @@
 package org.apache.streampipes.processors.statistics.flink.processor.stat.summary;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.statistics.flink.AbstractStatisticsProgram;
 
 public class StatisticsSummaryProgram extends AbstractStatisticsProgram<StatisticsSummaryParameters> {
 
-  public StatisticsSummaryProgram(StatisticsSummaryParameters params, boolean debug) {
-    super(params, debug);
-  }
-
-  public StatisticsSummaryProgram(StatisticsSummaryParameters params) {
-    super(params);
+  public StatisticsSummaryProgram(StatisticsSummaryParameters params,
+                                  ConfigExtractor configExtractor,
+                                  StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java
index 071aab5..720f8a1 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
@@ -108,7 +107,7 @@ public class StatisticsSummaryControllerWindow extends
     StatisticsSummaryParamsSerializable serializableParams = new StatisticsSummaryParamsSerializable
             (valueToObserve, timestampMapping, groupBy, (long) timeWindowSize, timeUnit);
 
-    return new StatisticsSummaryProgramWindow(params, serializableParams, StatisticsFlinkConfig.INSTANCE.getDebug());
+    return new StatisticsSummaryProgramWindow(params, serializableParams, configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java
index cafb214..2e42e2a 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.statistics.flink.AbstractStatisticsProgram;
 import org.apache.streampipes.processors.statistics.flink.extensions.MapKeySelector;
@@ -35,14 +37,11 @@ public class StatisticsSummaryProgramWindow extends
 
   private StatisticsSummaryParamsSerializable serializableParams;
 
-  public StatisticsSummaryProgramWindow(StatisticsSummaryParametersWindow params, StatisticsSummaryParamsSerializable serializableParams, boolean debug) {
-    super(params, debug);
-    this.streamTimeCharacteristic = TimeCharacteristic.EventTime;
-    this.serializableParams = serializableParams;
-  }
-
-  public StatisticsSummaryProgramWindow(StatisticsSummaryParametersWindow params, StatisticsSummaryParamsSerializable serializableParams) {
-    super(params);
+  public StatisticsSummaryProgramWindow(StatisticsSummaryParametersWindow params,
+                                        StatisticsSummaryParamsSerializable serializableParams,
+                                        ConfigExtractor configExtractor,
+                                        StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
     this.streamTimeCharacteristic = TimeCharacteristic.EventTime;
     this.serializableParams = serializableParams;
   }
@@ -71,4 +70,4 @@ public class StatisticsSummaryProgramWindow extends
 
 
 
-}
\ No newline at end of file
+}
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java
index 575074d..1fe58e5 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java
@@ -17,25 +17,31 @@
  */
 package org.apache.streampipes.processors.textmining.flink;
 
-import org.apache.streampipes.processors.textmining.flink.config.TextMiningFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.textmining.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractTextMiningProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractTextMiningProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractTextMiningProgram(B params) {
-    super(params, false);
+  public AbstractTextMiningProgram(B params,
+                                   ConfigExtractor configExtractor,
+                                   StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(TextMiningFlinkConfig.JAR_FILE,
-            TextMiningFlinkConfig.INSTANCE.getFlinkHost(), TextMiningFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java
index 29d4cbf..630be9d 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.textmining.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
 
 public class TextMiningFlinkInit extends StandaloneModelSubmitter {
@@ -52,6 +53,10 @@ public class TextMiningFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java
index 28a4be1..e9ccd59 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java
@@ -19,10 +19,8 @@
 package org.apache.streampipes.processors.textmining.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/TextMiningFlinkConfig.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/TextMiningFlinkConfig.java
deleted file mode 100644
index b4c410a..0000000
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/TextMiningFlinkConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum TextMiningFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processors-textmining-flink.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.textmining.flink";
-  private final static String service_name = "Processors Text Mining Flink";
-  private final static String service_container_name = "processors-textmining-flink";
-
-
-  TextMiningFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java
index 644a7ef..77d96dd 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.textmining.flink.config.TextMiningFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -64,6 +63,6 @@ public class LanguageDetectionController extends FlinkDataProcessorDeclarer<Lang
                                                                            StreamPipesClient streamPipesClient) {
     String fieldName = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
 
-    return new LanguageDetectionProgram(new LanguageDetectionParameters(graph, fieldName), TextMiningFlinkConfig.INSTANCE.getDebug());
+    return new LanguageDetectionProgram(new LanguageDetectionParameters(graph, fieldName), configExtractor, streamPipesClient);
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java
index 8c93e58..460afd2 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java
@@ -18,17 +18,17 @@
 package org.apache.streampipes.processors.textmining.flink.processor.language;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.textmining.flink.AbstractTextMiningProgram;
 
 public class LanguageDetectionProgram extends AbstractTextMiningProgram<LanguageDetectionParameters> {
 
-  public LanguageDetectionProgram(LanguageDetectionParameters params, boolean debug) {
-    super(params, debug);
-  }
-
-  public LanguageDetectionProgram(LanguageDetectionParameters params) {
-    super(params);
+  public LanguageDetectionProgram(LanguageDetectionParameters params,
+                                  ConfigExtractor configExtractor,
+                                  StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java
index 3e060aa..40d105a 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.textmining.flink.config.TextMiningFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -72,7 +71,7 @@ public class WordCountController extends FlinkDataProcessorDeclarer<WordCountPar
     String fieldName = extractor.mappingPropertyValue(WORD_COUNT_FIELD_KEY);
     Integer timeWindowValue = extractor.singleValueParameter(TIME_WINDOW_KEY, Integer.class);
 
-    return new WordCountProgram(new WordCountParameters(graph, fieldName, timeWindowValue), TextMiningFlinkConfig.INSTANCE.getDebug());
+    return new WordCountProgram(new WordCountParameters(graph, fieldName, timeWindowValue), configExtractor, streamPipesClient);
 
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java
index b3109f2..883ddae 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java
@@ -19,6 +19,8 @@
 package org.apache.streampipes.processors.textmining.flink.processor.wordcount;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.textmining.flink.AbstractTextMiningProgram;
 
@@ -26,12 +28,10 @@ import java.io.Serializable;
 
 public class WordCountProgram extends AbstractTextMiningProgram<WordCountParameters> implements Serializable {
 
-  public WordCountProgram(WordCountParameters params, boolean debug) {
-    super(params, debug);
-  }
-
-  public WordCountProgram(WordCountParameters params) {
-    super(params);
+  public WordCountProgram(WordCountParameters params,
+                          ConfigExtractor configExtractor,
+                          StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/AbstractFlinkTransformationProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/AbstractFlinkTransformationProgram.java
index 8667fb6..0f2ad4a 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/AbstractFlinkTransformationProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/AbstractFlinkTransformationProgram.java
@@ -17,25 +17,31 @@
  */
 package org.apache.streampipes.processors.transformation.flink;
 
-import org.apache.streampipes.processors.transformation.flink.config.TransformationFlinkConfig;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.processors.transformation.flink.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 public abstract class AbstractFlinkTransformationProgram<B extends EventProcessorBindingParams> extends FlinkDataProcessorRuntime<B> {
 
-  public AbstractFlinkTransformationProgram(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  public AbstractFlinkTransformationProgram(B params) {
-    super(params, TransformationFlinkConfig.INSTANCE.getDebug());
+  public AbstractFlinkTransformationProgram(B params,
+                                            ConfigExtractor configExtractor,
+                                            StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(TransformationFlinkConfig.JAR_FILE,
-            TransformationFlinkConfig.INSTANCE.getFlinkHost(), TransformationFlinkConfig.INSTANCE.getFlinkPort());
+  protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+    SpConfig config = configExtractor.getConfig();
+    return new FlinkDeploymentConfig(config.getString(
+            ConfigKeys.FLINK_JAR_FILE_LOC),
+            config.getString(ConfigKeys.FLINK_HOST),
+            config.getInteger(ConfigKeys.FLINK_PORT),
+            config.getBoolean(ConfigKeys.DEBUG)
+    );
   }
 
 
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
index 4a1a778..08bc9bc8 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.processors.transformation.flink.config.ConfigKeys;
 import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
 import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
 import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
@@ -37,13 +38,14 @@ import org.apache.streampipes.processors.transformation.flink.processor.rename.F
 
 public class TransformationFlinkInit extends StandaloneModelSubmitter {
 
+  public static final String ServiceGroup = "org.apache.streampipes.processors.transformation.flink";
   public static void main(String[] args) {
     new TransformationFlinkInit().init();
   }
 
   @Override
   public SpServiceDefinition provideServiceDefinition() {
-    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.transformation.flink",
+    return SpServiceDefinitionBuilder.create(ServiceGroup,
                     "Processors Transformation Flink",
                     "",
                     8090)
@@ -62,6 +64,10 @@ public class TransformationFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
             .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/ConfigKeys.java
index ddb4038..3082b22 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/ConfigKeys.java
@@ -19,12 +19,8 @@
 package org.apache.streampipes.processors.transformation.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String ICON_HOST = "SP_ICON_HOST";
-    final static String ICON_PORT = "SP_ICON_PORT";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/TransformationFlinkConfig.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/TransformationFlinkConfig.java
deleted file mode 100644
index 0f06da5..0000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/config/TransformationFlinkConfig.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.transformation.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum TransformationFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.processors.transformation.flink";
-  private final static String service_name = "Processors Transformation Flink";
-  private final static String service_container_name = "processors-transformation-flink";
-
-  TransformationFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateController.java
index 74b179a..6f1a1bf 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateController.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.transformation.flink.config.TransformationFlinkConfig;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
@@ -91,6 +90,6 @@ public class BoilerplateController extends FlinkDataProcessorDeclarer<Boilerplat
 
         BoilerplateParameters staticParams = new BoilerplateParameters(graph, htmlProperty, extractorMode, outputMode);
 
-        return new BoilerplateProgram(staticParams, TransformationFlinkConfig.INSTANCE.getDebug());
+        return new BoilerplateProgram(staticParams, configExtractor, streamPipesClient);
     }
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateProgram.java
index 09b1625..09913ea 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/boilerplate/BoilerplateProgram.java
@@ -19,17 +19,17 @@
 package org.apache.streampipes.processors.transformation.flink.processor.boilerplate;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
 public class BoilerplateProgram extends AbstractFlinkTransformationProgram<BoilerplateParameters> {
 
-    public BoilerplateProgram(BoilerplateParameters params, boolean debug) {
-        super(params, debug);
-    }
-
-    public BoilerplateProgram(BoilerplateParameters params) {
-        super(params);
+    public BoilerplateProgram(BoilerplateParameters params,
+                              ConfigExtractor configExtractor,
+                              StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
index e180dfc..f5843b8 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
@@ -33,12 +33,7 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.vocabulary.XSD;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
@@ -86,7 +81,7 @@ public class FieldConverterController extends
             targetDatatype
     );
 
-    return new FieldConverterProgram(staticParams);
+    return new FieldConverterProgram(staticParams, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterProgram.java
index f5592df..2dbcacf 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterProgram.java
@@ -18,17 +18,17 @@
 package org.apache.streampipes.processors.transformation.flink.processor.converter;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
 public class FieldConverterProgram extends AbstractFlinkTransformationProgram<FieldConverterParameters> {
 
-  public FieldConverterProgram(FieldConverterParameters params, boolean debug) {
-    super(params, debug);
-  }
-
-  public FieldConverterProgram(FieldConverterParameters params) {
-    super(params);
+  public FieldConverterProgram(FieldConverterParameters params,
+                               ConfigExtractor configExtractor,
+                               StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherController.java
index af4c0a1..92e19f9 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherController.java
@@ -27,11 +27,7 @@ import org.apache.streampipes.processors.transformation.flink.processor.hasher.a
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -67,7 +63,7 @@ public class FieldHasherController extends FlinkDataProcessorDeclarer<FieldHashe
     HashAlgorithmType hashAlgorithmType = HashAlgorithmType.valueOf(extractor.selectedSingleValue(HASH_ALGORITHM, String.class));
 
     return new FieldHasherProgram(
-            new FieldHasherParameters(graph, propertyName, hashAlgorithmType));
+            new FieldHasherParameters(graph, propertyName, hashAlgorithmType), configExtractor, streamPipesClient);
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherProgram.java
index 500498f..6427489 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/FieldHasherProgram.java
@@ -19,6 +19,8 @@
 package org.apache.streampipes.processors.transformation.flink.processor.hasher;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
@@ -27,12 +29,10 @@ import java.io.Serializable;
 public class FieldHasherProgram extends AbstractFlinkTransformationProgram<FieldHasherParameters>
 	implements Serializable{
 
-	public FieldHasherProgram(FieldHasherParameters params, boolean debug) {
-		super(params, debug);
-	}
-
-	public FieldHasherProgram(FieldHasherParameters params) {
-		super(params);
+	public FieldHasherProgram(FieldHasherParameters params,
+							  ConfigExtractor configExtractor,
+							  StreamPipesClient streamPipesClient) {
+		super(params, configExtractor, streamPipesClient);
 	}
 
 	@Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperController.java
index 49a0753..cc0f5b2 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperController.java
@@ -74,7 +74,7 @@ public class FieldMapperController extends
     FieldMapperParameters params = new FieldMapperParameters(graph, replacePropertyNames, newFieldName);
 
 
-    return new FieldMapperProgram(params);
+    return new FieldMapperProgram(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperProgram.java
index 57b7f1f..cefa5af 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/mapper/FieldMapperProgram.java
@@ -18,17 +18,17 @@
 package org.apache.streampipes.processors.transformation.flink.processor.mapper;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
 public class FieldMapperProgram extends AbstractFlinkTransformationProgram<FieldMapperParameters> {
 
-  public FieldMapperProgram(FieldMapperParameters params, boolean debug) {
-    super(params, debug);
-  }
-
-  public FieldMapperProgram(FieldMapperParameters params) {
-    super(params);
+  public FieldMapperProgram(FieldMapperParameters params,
+                            ConfigExtractor configExtractor,
+                            StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterController.java
index 1ac180d..c322168 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterController.java
@@ -100,7 +100,7 @@ public class MeasurementUnitConverterController extends
             outputUnit
     );
 
-    return new MeasurementUnitConverterProgram(staticParams);
+    return new MeasurementUnitConverterProgram(staticParams, configExtractor, streamPipesClient);
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterProgram.java
index a85b9d7..c615eed 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/measurementUnitConverter/MeasurementUnitConverterProgram.java
@@ -19,17 +19,17 @@
 package org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
 public class MeasurementUnitConverterProgram extends AbstractFlinkTransformationProgram<MeasurementUnitConverterParameters> {
 
-    public MeasurementUnitConverterProgram(MeasurementUnitConverterParameters params, boolean debug) {
-        super(params, debug);
-    }
-
-    public MeasurementUnitConverterProgram(MeasurementUnitConverterParameters params) {
-        super(params);
+    public MeasurementUnitConverterProgram(MeasurementUnitConverterParameters params,
+                                           ConfigExtractor configExtractor,
+                                           StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerController.java
index 4af8cf5..1e20eb0 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerController.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerController.java
@@ -26,11 +26,7 @@ import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.TransformOperations;
+import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -65,7 +61,7 @@ public class FieldRenamerController extends FlinkDataProcessorDeclarer<FieldRena
     String newPropertyName = extractor.singleValueParameter(FIELD_NAME, String.class);
 
     return new FieldRenamerProgram(
-            new FieldRenamerParameters(graph, oldPropertyName, newPropertyName));
+            new FieldRenamerParameters(graph, oldPropertyName, newPropertyName), configExtractor, streamPipesClient);
   }
 
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerProgram.java
index a3103e4..1c0ee57 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/rename/FieldRenamerProgram.java
@@ -19,17 +19,17 @@
 package org.apache.streampipes.processors.transformation.flink.processor.rename;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.flink.AbstractFlinkTransformationProgram;
 
 public class FieldRenamerProgram extends AbstractFlinkTransformationProgram<FieldRenamerParameters> {
 
-	public FieldRenamerProgram(FieldRenamerParameters params, boolean debug) {
-		super(params, debug);
-	}
-
-	public FieldRenamerProgram(FieldRenamerParameters params) {
-		super(params);
+	public FieldRenamerProgram(FieldRenamerParameters params,
+							   ConfigExtractor configExtractor,
+							   StreamPipesClient streamPipesClient) {
+		super(params, configExtractor, streamPipesClient);
 	}
 
 	@Override
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java
index 8c3ee8e..7939ae5 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java
@@ -22,12 +22,14 @@ import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,7 +61,8 @@ public class TestConverterProgram extends DataStreamTestBase {
   public void testConverterProgram() {
     FieldConverterParameters params = new FieldConverterParameters(InvocationGraphGenerator.makeEmptyInvocation(new FieldConverterController().declareModel()), "field", targetDatatype);
 
-    FieldConverterProgram program = new FieldConverterProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.ServiceGroup);
+    FieldConverterProgram program = new FieldConverterProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData(inputValue)));
 
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java
index fd107a7..e291dc3 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java
@@ -17,25 +17,23 @@
  */
 package org.apache.streampipes.processors.transformation.flink.processor.hasher;
 
-import static org.apache.streampipes.processors.transformation.flink.processor.hasher.TestFieldHasherUtils.makeTestData;
-
 import io.flinkspector.core.collection.ExpectedRecords;
 import io.flinkspector.datastream.DataStreamTestBase;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.*;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithmType;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Md5HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha1HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha2HashAlgorithm;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 
 import java.util.Arrays;
 
+import static org.apache.streampipes.processors.transformation.flink.processor.hasher.TestFieldHasherUtils.makeTestData;
+
 @RunWith(Parameterized.class)
 @Ignore
 public class TestFieldHasherProgram extends DataStreamTestBase {
@@ -59,7 +57,8 @@ public class TestFieldHasherProgram extends DataStreamTestBase {
   public void testFieldHasherProgram() {
 
     FieldHasherParameters params = makeParams();
-    FieldHasherProgram program = new FieldHasherProgram(params);
+    ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.ServiceGroup);
+    FieldHasherProgram program = new FieldHasherProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeTestData(true, hashAlgorithm)));
 
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java
index d30c1dc..46a4a7d 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java
@@ -22,11 +22,13 @@ import io.flinkspector.datastream.DataStreamTestBase;
 import io.flinkspector.datastream.input.EventTimeInput;
 import io.flinkspector.datastream.input.EventTimeInputBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runners.Parameterized;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,7 +56,8 @@ public class TestRenameProgram extends DataStreamTestBase {
   public void testConverterProgram() {
     FieldRenamerParameters params = new FieldRenamerParameters(InvocationGraphGenerator.makeEmptyInvocation(new FieldRenamerController().declareModel()), oldPropertyName, newPropertyName);
 
-    FieldRenamerProgram program = new FieldRenamerProgram(params, true);
+    ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.ServiceGroup);
+    FieldRenamerProgram program = new FieldRenamerProgram(params, configExtractor, null);
 
     DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData()));
 
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
index 6c8c90c..f9de8cd 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
@@ -28,6 +28,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.sinks.databases.flink.config.ConfigKeys;
 import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
 
 public class DatabasesFlinkInit extends StandaloneModelSubmitter {
@@ -52,6 +53,12 @@ public class DatabasesFlinkInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+            .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+            .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+            .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
+            .addConfig(ConfigKeys.ELASTIC_HOST, "elasticsearch", "Elastic search host address")
+            .addConfig(ConfigKeys.ELASTIC_PORT_REST, 9200, "Elasitc search rest port")
             .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/ConfigKeys.java
index 001b716..5d6a54e 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/ConfigKeys.java
@@ -19,13 +19,10 @@
 package org.apache.streampipes.sinks.databases.flink.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String FLINK_HOST = "SP_FLINK_HOST";
-    final static String FLINK_PORT = "SP_FLINK_PORT";
-    final static String ELASTIC_HOST = "SP_ELASTICSEARCH_HOST";
-    final static String ELASTIC_PORT = "SP_ELASTICSEARCH_PORT";
-    final static String ELASTIC_PORT_REST = "SP_ELASTICSEARCH_PORT_REST";
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
-    final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_HOST = "SP_FLINK_HOST";
+    public final static String FLINK_PORT = "SP_FLINK_PORT";
+    public final static String ELASTIC_HOST = "SP_ELASTICSEARCH_HOST";
+    public final static String ELASTIC_PORT_REST = "SP_ELASTICSEARCH_PORT_REST";
+    public final static String DEBUG = "SP_FLINK_DEBUG";
+    public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/DatabasesFlinkConfig.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/DatabasesFlinkConfig.java
deleted file mode 100644
index 739e27d..0000000
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/config/DatabasesFlinkConfig.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.databases.flink.config;
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum DatabasesFlinkConfig implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
-  private final static String service_id = "pe/org.apache.streampipes.sinks.databases.flink";
-  private final static String service_name = "Sinks Databases Flink";
-  private final static String service_container_name = "sinks-databases-flink";
-
-  DatabasesFlinkConfig() {
-    config = SpServiceDiscovery.getSpConfig(service_id);
-
-    config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe mixed flink component");
-    config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Port for the flink cluster");
-    config.register(ConfigKeys.ELASTIC_HOST, "elasticsearch", "Elastic search host address");
-    config.register(ConfigKeys.ELASTIC_PORT, 9300, "Elasitc search port");
-    config.register(ConfigKeys.ELASTIC_PORT_REST, 9200, "Elasitc search rest port");
-
-    config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
-    config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public String getElasticsearchHost() {
-    return config.getString(ConfigKeys.ELASTIC_HOST);
-  }
-
-  public int getElasticsearchPort() {
-    return config.getInteger(ConfigKeys.ELASTIC_PORT);
-  }
-
-  public int getElasticsearchPortRest() {
-    return config.getInteger(ConfigKeys.ELASTIC_PORT_REST);
-  }
-
-  public boolean getDebug() {
-    return config.getBoolean(ConfigKeys.DEBUG);
-  }
-
-  @Override
-  public String getId() {
-    return service_id;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-}
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchController.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchController.java
index 7e8fba9..46e4b49 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchController.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchController.java
@@ -31,7 +31,7 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.sinks.databases.flink.config.DatabasesFlinkConfig;
+import org.apache.streampipes.sinks.databases.flink.config.ConfigKeys;
 import org.apache.streampipes.wrapper.flink.FlinkDataSinkDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
 
@@ -63,10 +63,12 @@ public class ElasticSearchController extends FlinkDataSinkDeclarer<ElasticSearch
 
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING);
     String indexName = extractor.singleValueParameter(INDEX_NAME, String.class);
+    String elasticsearchHost = configExtractor.getConfig().getString(ConfigKeys.ELASTIC_HOST);
+    Integer elasticsearchPort = configExtractor.getConfig().getInteger(ConfigKeys.ELASTIC_PORT_REST);
 
-    ElasticSearchParameters params = new ElasticSearchParameters(graph, timestampField, indexName);
+    ElasticSearchParameters params = new ElasticSearchParameters(graph, timestampField, indexName, elasticsearchHost, elasticsearchPort);
 
-    return new ElasticSearchProgram(params, DatabasesFlinkConfig.INSTANCE.getDebug());
+    return new ElasticSearchProgram(params, configExtractor, streamPipesClient);
 
   }
 
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchParameters.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchParameters.java
index 38ee706..2ededc0 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchParameters.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchParameters.java
@@ -25,11 +25,19 @@ public class ElasticSearchParameters extends EventSinkBindingParams {
 
   private String timestampField;
   private String indexName;
+  private String elasticsearchHost;
+  private Integer elasticsearchPort;
 
-  public ElasticSearchParameters(DataSinkInvocation graph, String timestampField, String indexName) {
+  public ElasticSearchParameters(DataSinkInvocation graph,
+                                 String timestampField,
+                                 String indexName,
+                                 String elasticsearchHost,
+                                 Integer elasticsearchPort) {
     super(graph);
     this.timestampField = timestampField;
     this.indexName = indexName;
+    this.elasticsearchHost = elasticsearchHost;
+    this.elasticsearchPort = elasticsearchPort;
   }
 
   public String getTimestampField() {
@@ -39,4 +47,12 @@ public class ElasticSearchParameters extends EventSinkBindingParams {
   public String getIndexName() {
     return indexName;
   }
+
+  public String getElasticsearchHost() {
+    return elasticsearchHost;
+  }
+
+  public Integer getElasticsearchPort() {
+    return elasticsearchPort;
+  }
 }
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchProgram.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchProgram.java
index af05808..65a17d0 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchProgram.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/elasticsearch/ElasticSearchProgram.java
@@ -22,32 +22,38 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.util.Collector;
 import org.apache.http.HttpHost;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.databases.flink.config.DatabasesFlinkConfig;
+import org.apache.streampipes.sinks.databases.flink.config.ConfigKeys;
 import org.apache.streampipes.sinks.databases.flink.elasticsearch.elastic.ElasticsearchSink;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
 
 import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class ElasticSearchProgram extends FlinkDataSinkRuntime<ElasticSearchParameters> implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private static final String INDEX_NAME_PREFIX = "sp_";
 
-    public ElasticSearchProgram(ElasticSearchParameters params, boolean debug) {
-        super(params, debug);
+    public ElasticSearchProgram(ElasticSearchParameters params,
+                                ConfigExtractor configExtractor,
+                                StreamPipesClient streamPipesClient) {
+        super(params, configExtractor, streamPipesClient);
     }
 
     @Override
-    protected FlinkDeploymentConfig getDeploymentConfig() {
-        return new FlinkDeploymentConfig(DatabasesFlinkConfig.JAR_FILE,
-                DatabasesFlinkConfig.INSTANCE.getFlinkHost(), DatabasesFlinkConfig.INSTANCE.getFlinkPort());
+    protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+        SpConfig config = configExtractor.getConfig();
+        return new FlinkDeploymentConfig(config.getString(
+                ConfigKeys.FLINK_JAR_FILE_LOC),
+                config.getString(ConfigKeys.FLINK_HOST),
+                config.getInteger(ConfigKeys.FLINK_PORT),
+                config.getBoolean(ConfigKeys.DEBUG)
+        );
     }
 
     @Override
@@ -56,10 +62,12 @@ public class ElasticSearchProgram extends FlinkDataSinkRuntime<ElasticSearchPara
 
         String indexName = bindingParams.getIndexName();
         String timeName = bindingParams.getTimestampField();
+        String elasticsearchHost = bindingParams.getElasticsearchHost();
+        Integer elasticsearchPort = bindingParams.getElasticsearchPort();
 
         List<HttpHost> httpHosts = Arrays.asList(new HttpHost(
-                DatabasesFlinkConfig.INSTANCE.getElasticsearchHost(),
-                DatabasesFlinkConfig.INSTANCE.getElasticsearchPortRest(),
+                elasticsearchHost,
+                elasticsearchPort,
                 "http"));
 
         Map<String, String> userConfig = new HashMap<>();
diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
index f195aa2..0792e8f 100644
--- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
+++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.svcdiscovery.api.model.ConfigurationScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Map;
 import java.util.Optional;
 
diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
index 2c508c7..70f61f8 100644
--- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
+++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.wrapper.distributed.runtime;
 
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.dataformat.SpDataFormatManager;
 import org.apache.streampipes.messaging.kafka.config.ConsumerConfigFactory;
@@ -48,11 +50,13 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
     this.params = runtimeParams.getBindingParams();
   }
 
-  public DistributedRuntime(B bindingParams) {
+  public DistributedRuntime(B bindingParams,
+                            ConfigExtractor configExtractor,
+                            StreamPipesClient streamPipesClient) {
     super();
     this.bindingParams = bindingParams;
     this.params = bindingParams;
-    this.runtimeParams = makeRuntimeParams();
+    this.runtimeParams = makeRuntimeParams(configExtractor, streamPipesClient);
   }
 
   protected I getGraph() {
@@ -115,6 +119,7 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
     return topic.replaceAll("\\*", ".*");
   }
 
-  protected abstract RP makeRuntimeParams();
+  protected abstract RP makeRuntimeParams(ConfigExtractor configExtractor,
+                                          StreamPipesClient streamPipesClient);
 
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
index b9a49d8..23baa1c 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.wrapper.flink;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,6 +35,8 @@ import org.apache.streampipes.wrapper.flink.sink.JmsFlinkProducer;
 import org.apache.streampipes.wrapper.flink.sink.MqttFlinkProducer;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Properties;
@@ -43,24 +47,13 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
         DataProcessorInvocation, EventProcessorRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDataProcessorRuntime.class);
 
-  /**
-   * @deprecated Use {@link #FlinkDataProcessorRuntime(EventProcessorBindingParams, boolean)} instead
-   */
-  @Deprecated
-  public FlinkDataProcessorRuntime(B params) {
-    super(params);
-  }
-
-  public FlinkDataProcessorRuntime(B params, boolean debug) {
-    super(params, debug);
-  }
 
-  /**
-   * @deprecated Use {@link #FlinkDataProcessorRuntime(EventProcessorBindingParams, boolean)} instead
-   */
-  public FlinkDataProcessorRuntime(B params, FlinkDeploymentConfig config) {
-    super(params, config);
+  public FlinkDataProcessorRuntime(B params,
+                                   ConfigExtractor configExtractor,
+                                   StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @SuppressWarnings("deprecation")
@@ -106,8 +99,10 @@ public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingP
     return props;
   }
 
-  protected EventProcessorRuntimeParams<B> makeRuntimeParams() {
-    // TODO add support for config extractor & client
+  @Override
+  protected EventProcessorRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+                                                             StreamPipesClient streamPipesClient) {
+    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
     return new EventProcessorRuntimeParams<>(bindingParams, false, null, null);
   }
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
index 761c6de..d633eec 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDataSinkRuntime.java
@@ -19,34 +19,26 @@
 package org.apache.streampipes.wrapper.flink;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
 import org.apache.streampipes.wrapper.params.runtime.EventSinkRuntimeParams;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> extends
         FlinkRuntime<EventSinkRuntimeParams<B>, B, DataSinkInvocation, EventSinkRuntimeContext> {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDataSinkRuntime.class);
 
-  /**
-   * @deprecated Use {@link #FlinkDataSinkRuntime(EventSinkBindingParams, boolean)} instead
-   */
-  public FlinkDataSinkRuntime(B params) {
-    super(params);
-  }
-
-  public FlinkDataSinkRuntime(B params, boolean debug) {
-    super(params, debug);
-  }
-
-  /**
-   * @deprecated Use {@link #FlinkDataSinkRuntime(EventSinkBindingParams, boolean)} instead
-   */
-  public FlinkDataSinkRuntime(B params, FlinkDeploymentConfig config) {
-    super(params, config);
+  public FlinkDataSinkRuntime(B params,
+                              ConfigExtractor configExtractor,
+                              StreamPipesClient streamPipesClient) {
+    super(params, configExtractor, streamPipesClient);
   }
 
   @Override
@@ -57,8 +49,9 @@ public abstract class FlinkDataSinkRuntime<B extends EventSinkBindingParams> ext
 
   public abstract void getSink(DataStream<Event>... convertedStream1);
 
-  protected EventSinkRuntimeParams<B> makeRuntimeParams() {
-    // TODO add support for config extractor & client
+  protected EventSinkRuntimeParams<B> makeRuntimeParams(ConfigExtractor configExtractor,
+                                                        StreamPipesClient streamPipesClient) {
+    LOG.warn("The config extractor and StreamPipes Client can currently not be accessed by a deployed Flink program due to non-serializable classes.");
     return new EventSinkRuntimeParams<>(bindingParams, false, null, null);
   }
 
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDeploymentConfig.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDeploymentConfig.java
index 9babe14..3d251f5 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDeploymentConfig.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkDeploymentConfig.java
@@ -27,14 +27,25 @@ public class FlinkDeploymentConfig implements Serializable {
   private String jarFile;
   private String host;
   private int port;
+  private boolean miniClusterMode;
 
-  public FlinkDeploymentConfig(String jarFile, String host, int port) {
+  public FlinkDeploymentConfig(String jarFile,
+                               String host,
+                               int port) {
     super();
     this.jarFile = jarFile;
     this.host = host;
     this.port = port;
   }
 
+  public FlinkDeploymentConfig(String jarFile,
+                               String host,
+                               int port,
+                               boolean miniClusterMode) {
+    this(jarFile, host, port);
+    this.miniClusterMode = miniClusterMode;
+  }
+
   public String getJarFile() {
     return jarFile;
   }
@@ -47,4 +58,8 @@ public class FlinkDeploymentConfig implements Serializable {
     return port;
   }
 
+  public boolean isMiniClusterMode() {
+    return miniClusterMode;
+  }
+
 }
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
new file mode 100644
index 0000000..c371f96
--- /dev/null
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkMiniClusterDeploymentConfig.java
@@ -0,0 +1,8 @@
+package org.apache.streampipes.wrapper.flink;
+
+public class FlinkMiniClusterDeploymentConfig extends FlinkDeploymentConfig {
+
+    public FlinkMiniClusterDeploymentConfig() {
+        super("", "localhost", 6123, true);
+    }
+}
diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
index 699a094..1c652e3 100644
--- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
+++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java
@@ -28,7 +28,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -60,40 +62,13 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
 
   protected TimeCharacteristic streamTimeCharacteristic;
   protected FlinkDeploymentConfig config;
-
-  private boolean debug;
   private StreamExecutionEnvironment env;
 
-  /**
-   * @deprecated Use {@link #FlinkRuntime(BindingParams, boolean)} instead
-   */
-  @Deprecated
-  public FlinkRuntime(B bindingParams) {
-    this(bindingParams,true);
-  }
-
-  /**
-   * @deprecated Use {@link #FlinkRuntime(BindingParams, boolean)} instead
-   */
-  @Deprecated
-  public FlinkRuntime(B bindingParams, FlinkDeploymentConfig config) {
-    this(bindingParams, config, false);
-  }
-
-  public FlinkRuntime(B bindingParams, boolean debug) {
-    super(bindingParams);
-    if (!debug) {
-      this.config = getDeploymentConfig();
-    } else {
-      this.config = new FlinkDeploymentConfig("", "localhost", 6123);
-    }
-    this.debug = debug;
-  }
-
-  private FlinkRuntime(B bindingParams, FlinkDeploymentConfig config, boolean debug) {
-    super(bindingParams);
-    this.config = config;
-    this.debug = debug;
+  public FlinkRuntime(B bindingParams,
+                      ConfigExtractor configExtractor,
+                      StreamPipesClient streamPipesClient) {
+    super(bindingParams, configExtractor, streamPipesClient);
+    this.config = getDeploymentConfig(configExtractor);
   }
 
   public void run() {
@@ -175,7 +150,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
 
   @Override
   public void prepareRuntime() throws SpRuntimeException {
-    if (debug) {
+    if (config.isMiniClusterMode()) {
       this.env = StreamExecutionEnvironment.createLocalEnvironment();
     } else {
       this.env = StreamExecutionEnvironment
@@ -227,7 +202,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
       // The loop waits until the job is deployed
       // When the deployment takes longer then 60 seconds it returns false
       // This check is not needed when the execution environment is st to local
-      if (!debug) {
+      if (!config.isMiniClusterMode()) {
         boolean isDeployed = false;
         int count = 0;
         do {
@@ -324,7 +299,7 @@ public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends
     }
   }
 
-  protected abstract FlinkDeploymentConfig getDeploymentConfig();
+  protected abstract FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor);
 
   protected abstract void appendExecutionConfig(DataStream<Event>... convertedStream);
 
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
index b26c7ae..e86df64 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/runtime/EventProcessorRuntimeParams.java
@@ -25,8 +25,10 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.context.SpEventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
+import java.io.Serializable;
+
 public class EventProcessorRuntimeParams<B extends EventProcessorBindingParams> extends
-        RuntimeParams<B, DataProcessorInvocation, EventProcessorRuntimeContext> { // B - Bind Type
+        RuntimeParams<B, DataProcessorInvocation, EventProcessorRuntimeContext> implements Serializable {
 
   public EventProcessorRuntimeParams(B bindingParams,
                                      Boolean singletonEngine,

[incubator-streampipes] 01/04: [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a6d78cb0f20a3ed23eb1fdce64a8f26de349d23f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 22 23:06:40 2021 +0100

    [STREAMPIPES-487] Refactor Flink init classes to use service builder pattern
---
 streampipes-extensions/pom.xml                     |   6 ++
 .../pom.xml                                        |   1 +
 .../pe/flink/AllFlinkPipelineElementsInit.java     | 108 +++++++--------------
 .../pom.xml                                        |  15 +++
 .../aggregation/flink/AggregationFlinkInit.java    |  44 +++++----
 .../enricher/flink/EnricherFlinkInit.java          |  46 +++++----
 .../processor/geo/flink/GeoFlinkInit.java          |  38 +++++---
 .../detection/flink/PatternDetectionFlinkInit.java |  43 ++++----
 .../statistics/flink/StatisticsFlinkInit.java      |  40 ++++----
 .../flink/TransformationFlinkInit.java             |  47 +++++----
 .../sinks/databases/flink/DatabasesFlinkInit.java  |  37 ++++---
 11 files changed, 224 insertions(+), 201 deletions(-)

diff --git a/streampipes-extensions/pom.xml b/streampipes-extensions/pom.xml
index da1e083..4a3c99f 100644
--- a/streampipes-extensions/pom.xml
+++ b/streampipes-extensions/pom.xml
@@ -549,6 +549,12 @@
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-test-utils_2.11</artifactId>
                 <version>${flink.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j-impl</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
index 887f7ef..2ff8f1b 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/pom.xml
@@ -32,6 +32,7 @@
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-processors-aggregation-flink</artifactId>
             <version>0.69.0-SNAPSHOT</version>
+            <classifier>embed</classifier>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
index 59be7c4..3edfc2d 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
@@ -17,7 +17,8 @@
  */
 package org.apache.streampipes.pe.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -26,82 +27,45 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.pe.flink.config.Config;
-import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
-import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
-import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
-import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventRateController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
-import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
-import org.apache.streampipes.processors.enricher.flink.processor.trigonometry.TrigonometryController;
-import org.apache.streampipes.processors.enricher.flink.processor.urldereferencing.UrlDereferencingController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.sequence.SequenceController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
-import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
-import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
-import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
-import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
-import org.apache.streampipes.processors.transformation.flink.processor.mapper.FieldMapperController;
-import org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter.MeasurementUnitConverterController;
-import org.apache.streampipes.processors.transformation.flink.processor.rename.FieldRenamerController;
-import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
+import org.apache.streampipes.processor.geo.flink.GeoFlinkInit;
+import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
+import org.apache.streampipes.processors.enricher.flink.EnricherFlinkInit;
+import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
+import org.apache.streampipes.processors.statistics.flink.StatisticsFlinkInit;
+import org.apache.streampipes.processors.textmining.flink.TextMiningFlinkInit;
+import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
+import org.apache.streampipes.sinks.databases.flink.DatabasesFlinkInit;
 
 
 public class AllFlinkPipelineElementsInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            // streampipes-processors-aggregation-flink
-            .add(new AggregationController())
-            .add(new CountController())
-            .add(new EventRateController())
-            .add(new EventCountController())
-            // streampipes-processors-enricher-flink
-            .add(new TimestampController())
-            .add(new MathOpController())
-            .add(new StaticMathOpController())
-            .add(new UrlDereferencingController())
-            .add(new TrigonometryController())
-            // streampipes-processors-geo-flink
-            .add(new SpatialGridEnrichmentController())
-            // streampipes-processors-pattern-detection-flink
-            .add(new PeakDetectionController())
-            .add(new SequenceController())
-            .add(new AbsenceController())
-            .add(new AndController())
-            // streampipes-processors-statistics-flink
-            .add(new StatisticsSummaryController())
-            .add(new StatisticsSummaryControllerWindow())
-            // streampipes-processors-text-mining-flink
-            .add(new WordCountController())
-            // streampipes-processors-transformation-flink
-            .add(new FieldConverterController())
-            .add(new FieldHasherController())
-            .add(new FieldMapperController())
-            .add(new MeasurementUnitConverterController())
-            .add(new FieldRenamerController())
-            .add(new BoilerplateController())
-            // streampipes-sinks-databases-flink
-            .add(new ElasticSearchController());
-
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new AllFlinkPipelineElementsInit().init();
+  }
 
-    new AllFlinkPipelineElementsInit().init(Config.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org-apache-streampipes-pe-all-flink",
+                    "StreamPipes Bundled Pipeline Elements for JVM Wrapper",
+                    "",
+                    8090)
+            .merge(new AggregationFlinkInit().provideServiceDefinition())
+            .merge(new EnricherFlinkInit().provideServiceDefinition())
+            .merge(new GeoFlinkInit().provideServiceDefinition())
+            .merge(new PatternDetectionFlinkInit().provideServiceDefinition())
+            .merge(new StatisticsFlinkInit().provideServiceDefinition())
+            .merge(new TextMiningFlinkInit().provideServiceDefinition())
+            .merge(new TransformationFlinkInit().provideServiceDefinition())
+            .merge(new DatabasesFlinkInit().provideServiceDefinition())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
index 94d88c5..ce4ce40 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
@@ -107,6 +107,21 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <classifier>embed</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
index 4a41778..f276485 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/AggregationFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.aggregation.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
 import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
 import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
@@ -36,24 +36,28 @@ import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventR
 public class AggregationFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new AggregationController())
-            .add(new CountController())
-            .add(new EventRateController())
-            .add(new EventCountController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-
-    new AggregationFlinkInit().init(AggregationFlinkConfig.INSTANCE);
+    new AggregationFlinkInit().init();
   }
 
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.aggregation.flink",
+                    "Processors Aggregation Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new AggregationController(),
+                    new CountController(),
+                    new EventRateController(),
+                    new EventCountController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
index 093f6f4..eb6b46a 100644
--- a/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-enricher-flink/src/main/java/org/apache/streampipes/processors/enricher/flink/EnricherFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.enricher.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.enricher.flink.config.EnricherFlinkConfig;
 import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
 import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
@@ -37,25 +37,29 @@ import org.apache.streampipes.processors.enricher.flink.processor.urldereferenci
 public class EnricherFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new TimestampController())
-            .add(new MathOpController())
-            .add(new StaticMathOpController())
-            .add(new UrlDereferencingController())
-            .add(new TrigonometryController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-
-    new EnricherFlinkInit().init(EnricherFlinkConfig.INSTANCE);
+    new EnricherFlinkInit().init();
   }
 
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.enricher.flink",
+                    "Processors Enricher Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new TimestampController(),
+                    new MathOpController(),
+                    new StaticMathOpController(),
+                    new UrlDereferencingController(),
+                    new TrigonometryController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
+  }
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
index f531145..3e5a8ed 100644
--- a/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-flink/src/main/java/org/apache/streampipes/processor/geo/flink/GeoFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processor.geo.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,31 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processor.geo.flink.config.GeoFlinkConfig;
 import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
 
 public class GeoFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new SpatialGridEnrichmentController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new GeoFlinkInit().init();
+  }
 
-    new GeoFlinkInit().init(GeoFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.geo.flink",
+                    "Processors Geo Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new SpatialGridEnrichmentController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
index 8c5f0a3..1fbb189 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.pattern.detection.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.pattern.detection.flink.config.PatternDetectionFlinkConfig;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
 import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
@@ -36,22 +36,29 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.seque
 public class PatternDetectionFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new PeakDetectionController())
-            .add(new SequenceController())
-            .add(new AbsenceController())
-            .add(new AndController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new PatternDetectionFlinkInit().init();
+  }
 
-    new PatternDetectionFlinkInit().init(PatternDetectionFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.patterndetection.flink",
+                    "Processors Pattern Detection Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new PeakDetectionController(),
+                    new SequenceController(),
+                    new AbsenceController(),
+                    new AndController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
index 5abf0b9..8ac42b8 100644
--- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.statistics.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,28 +28,33 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.statistics.flink.config.StatisticsFlinkConfig;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
 import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
 
 public class StatisticsFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new StatisticsSummaryController())
-            .add(new StatisticsSummaryControllerWindow());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new StatisticsFlinkInit().init();
+  }
 
-    new StatisticsFlinkInit().init(StatisticsFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.statistics.flink",
+                    "Processors Statistics Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new StatisticsSummaryController(),
+                    new StatisticsSummaryControllerWindow())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
index e3e0282..4a1a778 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/TransformationFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.processors.transformation.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,7 +28,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.processors.transformation.flink.config.TransformationFlinkConfig;
 import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
 import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
 import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
@@ -38,25 +38,30 @@ import org.apache.streampipes.processors.transformation.flink.processor.rename.F
 public class TransformationFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new FieldConverterController())
-            .add(new FieldHasherController())
-            .add(new FieldMapperController())
-            .add(new MeasurementUnitConverterController())
-            .add(new FieldRenamerController())
-            .add(new BoilerplateController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+    new TransformationFlinkInit().init();
+  }
 
-    new TransformationFlinkInit().init(TransformationFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.transformation.flink",
+                    "Processors Transformation Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new FieldConverterController(),
+                    new FieldHasherController(),
+                    new FieldMapperController(),
+                    new MeasurementUnitConverterController(),
+                    new FieldRenamerController(),
+                    new BoilerplateController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }
diff --git a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
index f7e3367..6c8c90c 100644
--- a/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
+++ b/streampipes-extensions/streampipes-sinks-databases-flink/src/main/java/org/apache/streampipes/sinks/databases/flink/DatabasesFlinkInit.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.sinks.databases.flink;
 
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -27,26 +28,30 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.sinks.databases.flink.config.DatabasesFlinkConfig;
 import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
 
 public class DatabasesFlinkInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
-    DeclarersSingleton.getInstance()
-            .add(new ElasticSearchController());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
+    new DatabasesFlinkInit().init();
+  }
 
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
-    
-    new DatabasesFlinkInit().init(DatabasesFlinkConfig.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("org.apache.streampipes.sinks.databases.flink",
+                    "Sinks Databases Flink",
+                    "",
+                    8090)
+            .registerPipelineElements(new ElasticSearchController())
+            .registerMessagingFormats(
+                    new JsonDataFormatFactory(),
+                    new CborDataFormatFactory(),
+                    new SmileDataFormatFactory(),
+                    new FstDataFormatFactory())
+            .registerMessagingProtocols(
+                    new SpKafkaProtocolFactory(),
+                    new SpJmsProtocolFactory(),
+                    new SpMqttProtocolFactory())
+            .build();
   }
 }

[incubator-streampipes] 04/04: Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 020f6a7c198b12029cee90f61d42d7dd732e84d2
Merge: 8f96f57 b420738
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Dec 28 10:59:52 2021 +0100

    Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

 .../streampipes/connect/adapter/Adapter.java       |   1 +
 .../stream}/DuplicateFilterPipelineElement.java    |   2 +-
 .../stream/EventRateTransformationRule.java        |   5 -
 .../generic/elements/DuplicateFilterTest.java      |   2 +-
 .../downloads/sp_adaptertotestschemarules.csv      |   2 +
 .../fixtures/connect/aggregationRules/expected.csv |   2 +
 .../fixtures/connect/aggregationRules/input.csv    |  11 ++
 .../connect/removeDuplicateRules/expected.csv      |   6 +
 .../connect/removeDuplicateRules/input.csv         |  11 ++
 .../fixtures/connect/schemaRules/expected.csv      |   2 +
 ui/cypress/fixtures/connect/schemaRules/input.csv  |   2 +
 .../fixtures/connect/valueRules/expected.csv       |   2 +
 ui/cypress/fixtures/connect/valueRules/input.csv   |   2 +
 .../support/utils/ConnectEventSchemaUtils.ts       | 135 +++++++++++++++++++++
 .../utils/{AdapterUtils.ts => ConnectUtils.ts}     | 132 +++++++++++---------
 ui/cypress/support/utils/DataLakeUtils.ts          |  21 ++--
 ui/cypress/support/utils/FileManagementUtils.ts    |  20 +--
 .../support/utils/ProcessingElementTestUtils.ts    |   5 +-
 .../support/utils/ThirdPartyIntegrationUtils.ts    |   8 +-
 ui/cypress/tests/adapter/fileStream.smoke.spec.ts  |   6 +-
 .../adapter/machineDataSimulator.smoke.spec.ts     |  15 ++-
 .../tests/adapter/persistInDataLake.smoke.spec.ts  |   4 +-
 .../tests/adapter/randomDataSimulatorStream.ts     |   6 +-
 ui/cypress/tests/adapter/schemaRules.smoke.ts      |  53 ++++++++
 ui/cypress/tests/adapter/streamRules.smoke.ts      |  65 ++++++++++
 ui/cypress/tests/adapter/valueRules.smoke.ts       |  52 ++++++++
 .../tests/datalake/testSpecialCharactersInName.ts  |   4 +-
 .../tests/pipeline/pipelineTest.smoke.spec.ts      |   4 +-
 ui/package.json                                    |   4 +-
 .../event-property-row.component.html              |   9 +-
 .../event-schema/event-schema.component.html       |  11 +-
 .../schema-editor-header.component.html            |  28 +++--
 .../start-adapter-configuration.component.html     |  12 +-
 .../start-adapter-configuration.component.ts       |   2 +-
 .../edit-correction-value.component.html           |  10 +-
 .../edit-data-type/edit-data-type.component.html   |   8 +-
 .../edit-event-property-primitive.component.html   |   8 +-
 .../edit-event-property-primitive.component.ts     |  10 +-
 .../edit-timestamp-property.component.html         |   7 +-
 .../edit-unit-transformation.component.html        |  39 ++++--
 .../edit-event-property.component.html             |  10 +-
 .../edit-event-property.component.ts               |   6 +-
 .../time-series-chart-widget.component.ts          |  16 +--
 43 files changed, 597 insertions(+), 163 deletions(-)

[incubator-streampipes] 02/04: [STREAMPIPES-486] Bump Flink version, add log4j to dependency management

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 89c3842845676e89377c1b9a94664c4641b92357
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 22 23:11:17 2021 +0100

    [STREAMPIPES-486] Bump Flink version, add log4j to dependency management
---
 .../cli/deploy/standalone/jobmanager/docker-compose.yml    |  2 +-
 .../cli/deploy/standalone/taskmanager/docker-compose.yml   |  2 +-
 installer/compose/docker-compose.full.yml                  |  4 ++--
 installer/k8s/values.yaml                                  |  2 +-
 pom.xml                                                    | 14 ++++++++++++--
 5 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/installer/cli/deploy/standalone/jobmanager/docker-compose.yml b/installer/cli/deploy/standalone/jobmanager/docker-compose.yml
index fe692e9..ea436ae 100644
--- a/installer/cli/deploy/standalone/jobmanager/docker-compose.yml
+++ b/installer/cli/deploy/standalone/jobmanager/docker-compose.yml
@@ -16,7 +16,7 @@
 version: "3.4"
 services:
   jobmanager:
-    image: fogsyio/flink:1.13.2-scala_2.11
+    image: fogsyio/flink:1.13.5-scala_2.11
     command: jobmanager
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
diff --git a/installer/cli/deploy/standalone/taskmanager/docker-compose.yml b/installer/cli/deploy/standalone/taskmanager/docker-compose.yml
index d7ecad3..6ff8a50 100644
--- a/installer/cli/deploy/standalone/taskmanager/docker-compose.yml
+++ b/installer/cli/deploy/standalone/taskmanager/docker-compose.yml
@@ -16,7 +16,7 @@
 version: "3.4"
 services:
   taskmanager:
-    image: fogsyio/flink:1.13.2-scala_2.11
+    image: fogsyio/flink:1.13.5-scala_2.11
     depends_on:
       - jobmanager
     command: taskmanager
diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml
index 8d33814..38ae812 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -168,7 +168,7 @@ services:
       spnet:
 
   jobmanager:
-    image: fogsyio/flink:1.13.2-scala_2.11
+    image: fogsyio/flink:1.13.5-scala_2.11
     command: jobmanager
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
@@ -178,7 +178,7 @@ services:
       spnet:
 
   taskmanager:
-    image: fogsyio/flink:1.13.2-scala_2.11
+    image: fogsyio/flink:1.13.5-scala_2.11
     depends_on:
       - jobmanager
     command: taskmanager
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index 8b9263d..382f41e 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -27,7 +27,7 @@ external:
   activemqVersion: 5.15.9
   consulVersion: 1.9.6
   couchdbVersion: 2.3.1
-  flinkVersion: 1.13.2-scala_2.11
+  flinkVersion: 1.13.5-scala_2.11
   kafkaVersion: 2.2.0
   zookeeperVersion: 3.4.13
   influxdbVersion: 1.7
diff --git a/pom.xml b/pom.xml
index 8d0f2b2..63beeca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
         <consul-client.version>1.5.2</consul-client.version>
         <elasticsearch.version>6.6.2</elasticsearch.version>
         <empire-rdf4j.version>1.9.14</empire-rdf4j.version>
-        <flink.version>1.13.2</flink.version>
+        <flink.version>1.13.5</flink.version>
         <fogsy-qudt.version>1.0</fogsy-qudt.version>
         <fst.version>2.56</fst.version>
         <geojson-jackson.version>1.8</geojson-jackson.version>
@@ -83,7 +83,7 @@
         <jsr305.version>3.0.2</jsr305.version>
         <kafka.version>2.7.0</kafka.version>
         <lightcouch.version>0.2.0</lightcouch.version>
-        <log4j.version>2.12.1</log4j.version>
+        <log4j.version>2.17.0</log4j.version>
         <lz4.version>1.7.1</lz4.version>
         <logback-classic.version>1.2.3</logback-classic.version>
         <maven-invoker.version>2.2</maven-invoker.version>
@@ -401,6 +401,16 @@
                 <version>${log4j.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-core</artifactId>
+                <version>${log4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-to-slf4j</artifactId>
+                <version>${log4j.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.maven.shared</groupId>
                 <artifactId>maven-invoker</artifactId>
                 <version>${maven-invoker.version}</version>