You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/09/07 08:27:40 UTC

[incubator-streampipes-extensions] 02/02: [STREAMPIPES-225] add pipeline-elements-all-flink module

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 6a88d198196bc55a72feed3ddf3a5492bc20492c
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Sep 7 10:27:14 2020 +0200

    [STREAMPIPES-225] add pipeline-elements-all-flink module
---
 .github/workflows/build.yml                        |  20 ++++
 pom.xml                                            |   1 +
 streampipes-pipeline-elements-all-flink/Dockerfile |  25 +++++
 .../aarch64.Dockerfile                             |  26 +++++
 .../arm.Dockerfile                                 |  26 +++++
 .../development/env                                |  20 ++++
 streampipes-pipeline-elements-all-flink/pom.xml    | 122 +++++++++++++++++++++
 .../pe/flink/AllFlinkPipelineElementsInit.java     | 103 +++++++++++++++++
 .../apache/streampipes/pe/flink/config/Config.java |  75 +++++++++++++
 .../streampipes/pe/flink/config/ConfigKeys.java    |  27 +++++
 10 files changed, 445 insertions(+)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ea01b08..ac3b916 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -83,6 +83,26 @@ jobs:
           docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM32V7 --os linux --arch arm
           docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM64V8 --os linux --arch arm64
           docker manifest push $IMG_NAME_DEFAULT
+      - name: Build and Push Docker Image pipeline-elements-all-flink
+        working-directory: ./streampipes-pipeline-elements-all-flink
+        env:
+          IMG_NAME_DEFAULT: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:${{ env.MVN_VERSION }}
+          IMG_NAME_AMD64: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:amd64-${{ env.MVN_VERSION }}
+          IMG_NAME_ARM32V7: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:arm32v7-${{ env.MVN_VERSION }}
+          IMG_NAME_ARM64V8: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:arm64v8-${{ env.MVN_VERSION }}
+        run: |
+          cp /usr/bin/{qemu-arm-static,qemu-aarch64-static} .
+          docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_DEFAULT -t $IMG_NAME_DEFAULT -t $IMG_NAME_AMD64 -f Dockerfile .
+          docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_ARM32V7 -t $IMG_NAME_ARM32V7 -f arm.Dockerfile .
+          docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_ARM64V8 -t $IMG_NAME_ARM64V8 -f aarch64.Dockerfile .
+          docker push $IMG_NAME_DEFAULT
+          docker push $IMG_NAME_AMD64
+          docker push $IMG_NAME_ARM32V7
+          docker push $IMG_NAME_ARM64V8
+          docker manifest create $IMG_NAME_DEFAULT $IMG_NAME_AMD64 $IMG_NAME_ARM32V7 $IMG_NAME_ARM64V8
+          docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM32V7 --os linux --arch arm
+          docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM64V8 --os linux --arch arm64
+          docker manifest push $IMG_NAME_DEFAULT
       - name: Build and Push Docker Image pipeline-elements-all-jvm
         working-directory: ./streampipes-pipeline-elements-all-jvm
         env:
diff --git a/pom.xml b/pom.xml
index 255c493..d2f18ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
         <module>streampipes-pipeline-elements-all-jvm</module>
         <module>streampipes-pipeline-elements-data-simulator</module>
         <module>streampipes-connect-adapters</module>
+        <module>streampipes-pipeline-elements-all-flink</module>
     </modules>
 
     <properties>
diff --git a/streampipes-pipeline-elements-all-flink/Dockerfile b/streampipes-pipeline-elements-all-flink/Dockerfile
new file mode 100644
index 0000000..5e96faf
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/Dockerfile
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=adoptopenjdk/openjdk8-openj9:alpine-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY target/streampipes-pipeline-elements-all-flink.jar  /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile b/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile
new file mode 100644
index 0000000..01eae1b
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=arm64v8/openjdk:11-jre-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY qemu-aarch64-static /usr/bin
+COPY target/streampipes-pipeline-elements-all-flink.jar  /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/arm.Dockerfile b/streampipes-pipeline-elements-all-flink/arm.Dockerfile
new file mode 100644
index 0000000..189a6db
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/arm.Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=arm32v7/openjdk:11-jre-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY qemu-arm-static /usr/bin
+COPY target/streampipes-pipeline-elements-all-flink.jar  /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/development/env b/streampipes-pipeline-elements-all-flink/development/env
new file mode 100644
index 0000000..2eaca6f
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/development/env
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Those parameters are used by IntelliJ to set the default consul parameters for development
+SP_PORT=8005
+SP_HOST=host.docker.internal
+SP_DEBUG=true
+SP_FLINK_DEBUG=true
\ No newline at end of file
diff --git a/streampipes-pipeline-elements-all-flink/pom.xml b/streampipes-pipeline-elements-all-flink/pom.xml
new file mode 100644
index 0000000..6d31766
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streampipes-extensions</artifactId>
+        <groupId>org.apache.streampipes</groupId>
+        <version>0.68.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampipes-pipeline-elements-all-flink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-aggregation-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-enricher-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-geo-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-pattern-detection-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-statistics-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-text-mining-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-processors-transformation-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-sinks-databases-flink</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
+
+        <!-- 3rd party dependencies to avoid convergence errors -->
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.atteo.classindex</groupId>
+            <artifactId>classindex</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>META-INF/spring.handlers</resource>
+                                </transformer>
+                                <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+                                    <resource>META-INF/spring.factories</resource>
+                                </transformer>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>META-INF/spring.schemas</resource>
+                                </transformer>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>reference.conf</resource>
+                                </transformer>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.streampipes.pe.flink.AllFlinkPipelineElementsInit
+                                    </mainClass>
+                                </transformer>
+                            </transformers>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <finalName>streampipes-pipeline-elements-all-flink</finalName>
+    </build>
+</project>
\ No newline at end of file
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
new file mode 100644
index 0000000..4818571
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink;
+
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
+import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
+import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import org.apache.streampipes.pe.flink.config.Config;
+import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
+import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
+import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
+import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
+import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventRateController;
+import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
+import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
+import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
+import org.apache.streampipes.processors.enricher.flink.processor.trigonometry.TrigonometryController;
+import org.apache.streampipes.processors.enricher.flink.processor.urldereferencing.UrlDereferencingController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.sequence.SequenceController;
+import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
+import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
+import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
+import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
+import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
+import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
+import org.apache.streampipes.processors.transformation.flink.processor.mapper.FieldMapperController;
+import org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter.MeasurementUnitConverterController;
+import org.apache.streampipes.processors.transformation.flink.processor.rename.FieldRenamerController;
+import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
+
+
+public class AllFlinkPipelineElementsInit extends StandaloneModelSubmitter {
+
+  public static void main(String[] args) {
+    DeclarersSingleton.getInstance()
+            // streampipes-processors-aggregation-flink
+            .add(new AggregationController())
+            .add(new CountController())
+            .add(new EventRateController())
+            .add(new EventCountController())
+            // streampipes-processors-enricher-flink
+            .add(new TimestampController())
+            .add(new MathOpController())
+            .add(new StaticMathOpController())
+            .add(new UrlDereferencingController())
+            .add(new TrigonometryController())
+            // streampipes-processors-geo-flink
+            .add(new SpatialGridEnrichmentController())
+            // streampipes-processors-pattern-detection-flink
+            .add(new PeakDetectionController())
+            .add(new SequenceController())
+            .add(new AbsenceController())
+            .add(new AndController())
+            // streampipes-processors-statistics-flink
+            .add(new StatisticsSummaryController())
+            .add(new StatisticsSummaryControllerWindow())
+            // streampipes-processors-text-mining-flink
+            .add(new WordCountController())
+            // streampipes-processors-transformation-flink
+            .add(new FieldConverterController())
+            .add(new FieldHasherController())
+            .add(new FieldMapperController())
+            .add(new MeasurementUnitConverterController())
+            .add(new FieldRenamerController())
+            .add(new BoilerplateController())
+            // streampipes-sinks-databases-flink
+            .add(new ElasticSearchController());
+
+
+    DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+            new CborDataFormatFactory(),
+            new SmileDataFormatFactory(),
+            new FstDataFormatFactory());
+
+    DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+            new SpJmsProtocolFactory());
+
+    new AllFlinkPipelineElementsInit().init(Config.INSTANCE);
+  }
+}
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
new file mode 100644
index 0000000..c4618bb
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink.config;
+
+import org.apache.streampipes.config.SpConfig;
+import org.apache.streampipes.container.model.PeConfig;
+
+public enum Config implements PeConfig {
+  INSTANCE;
+
+  private final SpConfig config;
+  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
+  private final static String SERVICE_ID = "pe/org.apache.streampipes.processors.all.flink";
+  private final static String SERVICE_NAME = "Processors Flink (Bundle)";
+  private final static String SERVICE_CONTAINER_NAME = "pipeline-elements-all-flink";
+
+  Config() {
+    config = SpConfig.getSpConfig(SERVICE_ID);
+    config.register(ConfigKeys.HOST, SERVICE_CONTAINER_NAME, "Data processor host");
+    config.register(ConfigKeys.PORT, 8090, "Data processor port");
+    config.register(ConfigKeys.SERVICE_NAME, SERVICE_NAME, "Data processor service name");
+    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
+    config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
+    config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
+  }
+
+  public String getFlinkHost() {
+    return config.getString(ConfigKeys.FLINK_HOST);
+  }
+
+  public int getFlinkPort() {
+    return config.getInteger(ConfigKeys.FLINK_PORT);
+  }
+
+  public boolean getFlinkDebug() {
+    return config.getBoolean(ConfigKeys.FLINK_DEBUG);
+  }
+
+  @Override
+  public String getHost() {
+    return config.getString(ConfigKeys.HOST);
+  }
+
+  @Override
+  public int getPort() {
+    return config.getInteger(ConfigKeys.PORT);
+  }
+
+  @Override
+  public String getId() {
+    return SERVICE_ID;
+  }
+
+  @Override
+  public String getName() {
+    return config.getString(ConfigKeys.SERVICE_NAME);
+  }
+
+
+}
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
new file mode 100644
index 0000000..cacc5ec
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink.config;
+
+public class ConfigKeys {
+  final static String HOST = "SP_HOST";
+  final static String PORT = "SP_PORT";
+  final static String SERVICE_NAME = "SP_SERVICE_NAME";
+  final static String FLINK_HOST = "SP_FLINK_HOST";
+  final static String FLINK_PORT = "SP_FLINK_PORT";
+  final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
+}