You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/09/07 08:27:40 UTC
[incubator-streampipes-extensions] 02/02: [STREAMPIPES-225] add
pipeline-elements-all-flink module
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 6a88d198196bc55a72feed3ddf3a5492bc20492c
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Mon Sep 7 10:27:14 2020 +0200
[STREAMPIPES-225] add pipeline-elements-all-flink module
---
.github/workflows/build.yml | 20 ++++
pom.xml | 1 +
streampipes-pipeline-elements-all-flink/Dockerfile | 25 +++++
.../aarch64.Dockerfile | 26 +++++
.../arm.Dockerfile | 26 +++++
.../development/env | 20 ++++
streampipes-pipeline-elements-all-flink/pom.xml | 122 +++++++++++++++++++++
.../pe/flink/AllFlinkPipelineElementsInit.java | 103 +++++++++++++++++
.../apache/streampipes/pe/flink/config/Config.java | 75 +++++++++++++
.../streampipes/pe/flink/config/ConfigKeys.java | 27 +++++
10 files changed, 445 insertions(+)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ea01b08..ac3b916 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -83,6 +83,26 @@ jobs:
docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM32V7 --os linux --arch arm
docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM64V8 --os linux --arch arm64
docker manifest push $IMG_NAME_DEFAULT
+ - name: Build and Push Docker Image pipeline-elements-all-flink
+ working-directory: ./streampipes-pipeline-elements-all-flink
+ env:
+ IMG_NAME_DEFAULT: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:${{ env.MVN_VERSION }}
+ IMG_NAME_AMD64: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:amd64-${{ env.MVN_VERSION }}
+ IMG_NAME_ARM32V7: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:arm32v7-${{ env.MVN_VERSION }}
+ IMG_NAME_ARM64V8: ${{ env.DOCKERHUB_APACHE_REPO }}/pipeline-elements-all-flink:arm64v8-${{ env.MVN_VERSION }}
+ run: |
+ cp /usr/bin/{qemu-arm-static,qemu-aarch64-static} .
+ docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_DEFAULT -t $IMG_NAME_DEFAULT -t $IMG_NAME_AMD64 -f Dockerfile .
+ docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_ARM32V7 -t $IMG_NAME_ARM32V7 -f arm.Dockerfile .
+ docker build --pull --build-arg BASE_IMG=$BASE_IMG_JRE_ARM64V8 -t $IMG_NAME_ARM64V8 -f aarch64.Dockerfile .
+ docker push $IMG_NAME_DEFAULT
+ docker push $IMG_NAME_AMD64
+ docker push $IMG_NAME_ARM32V7
+ docker push $IMG_NAME_ARM64V8
+ docker manifest create $IMG_NAME_DEFAULT $IMG_NAME_AMD64 $IMG_NAME_ARM32V7 $IMG_NAME_ARM64V8
+ docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM32V7 --os linux --arch arm
+ docker manifest annotate $IMG_NAME_DEFAULT $IMG_NAME_ARM64V8 --os linux --arch arm64
+ docker manifest push $IMG_NAME_DEFAULT
- name: Build and Push Docker Image pipeline-elements-all-jvm
working-directory: ./streampipes-pipeline-elements-all-jvm
env:
diff --git a/pom.xml b/pom.xml
index 255c493..d2f18ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
<module>streampipes-pipeline-elements-all-jvm</module>
<module>streampipes-pipeline-elements-data-simulator</module>
<module>streampipes-connect-adapters</module>
+ <module>streampipes-pipeline-elements-all-flink</module>
</modules>
<properties>
diff --git a/streampipes-pipeline-elements-all-flink/Dockerfile b/streampipes-pipeline-elements-all-flink/Dockerfile
new file mode 100644
index 0000000..5e96faf
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/Dockerfile
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=adoptopenjdk/openjdk8-openj9:alpine-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY target/streampipes-pipeline-elements-all-flink.jar /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile b/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile
new file mode 100644
index 0000000..01eae1b
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/aarch64.Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=arm64v8/openjdk:11-jre-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY qemu-aarch64-static /usr/bin
+COPY target/streampipes-pipeline-elements-all-flink.jar /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/arm.Dockerfile b/streampipes-pipeline-elements-all-flink/arm.Dockerfile
new file mode 100644
index 0000000..189a6db
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/arm.Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ARG BASE_IMAGE=arm32v7/openjdk:11-jre-slim
+FROM $BASE_IMAGE
+
+ENV CONSUL_LOCATION consul
+
+EXPOSE 8090
+
+COPY qemu-arm-static /usr/bin
+COPY target/streampipes-pipeline-elements-all-flink.jar /streampipes-processing-element-container.jar
+
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/streampipes-pipeline-elements-all-flink/development/env b/streampipes-pipeline-elements-all-flink/development/env
new file mode 100644
index 0000000..2eaca6f
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/development/env
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Those parameters are used by IntelliJ to set the default consul parameters for development
+SP_PORT=8005
+SP_HOST=host.docker.internal
+SP_DEBUG=true
+SP_FLINK_DEBUG=true
\ No newline at end of file
diff --git a/streampipes-pipeline-elements-all-flink/pom.xml b/streampipes-pipeline-elements-all-flink/pom.xml
new file mode 100644
index 0000000..6d31766
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streampipes-extensions</artifactId>
+ <groupId>org.apache.streampipes</groupId>
+ <version>0.68.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streampipes-pipeline-elements-all-flink</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-aggregation-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-enricher-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-geo-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-pattern-detection-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-statistics-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-text-mining-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-processors-transformation-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-sinks-databases-flink</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+
+ <!-- 3rd party dependencies to avoid convergence errors -->
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.atteo.classindex</groupId>
+ <artifactId>classindex</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+ <resource>META-INF/spring.factories</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.streampipes.pe.flink.AllFlinkPipelineElementsInit
+ </mainClass>
+ </transformer>
+ </transformers>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <finalName>streampipes-pipeline-elements-all-flink</finalName>
+ </build>
+</project>
\ No newline at end of file
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
new file mode 100644
index 0000000..4818571
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/AllFlinkPipelineElementsInit.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink;
+
+import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
+import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
+import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import org.apache.streampipes.pe.flink.config.Config;
+import org.apache.streampipes.processor.geo.flink.processor.gridenricher.SpatialGridEnrichmentController;
+import org.apache.streampipes.processors.aggregation.flink.processor.aggregation.AggregationController;
+import org.apache.streampipes.processors.aggregation.flink.processor.count.CountController;
+import org.apache.streampipes.processors.aggregation.flink.processor.eventcount.EventCountController;
+import org.apache.streampipes.processors.aggregation.flink.processor.rate.EventRateController;
+import org.apache.streampipes.processors.enricher.flink.processor.math.mathop.MathOpController;
+import org.apache.streampipes.processors.enricher.flink.processor.math.staticmathop.StaticMathOpController;
+import org.apache.streampipes.processors.enricher.flink.processor.timestamp.TimestampController;
+import org.apache.streampipes.processors.enricher.flink.processor.trigonometry.TrigonometryController;
+import org.apache.streampipes.processors.enricher.flink.processor.urldereferencing.UrlDereferencingController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.PeakDetectionController;
+import org.apache.streampipes.processors.pattern.detection.flink.processor.sequence.SequenceController;
+import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController;
+import org.apache.streampipes.processors.statistics.flink.processor.stat.window.StatisticsSummaryControllerWindow;
+import org.apache.streampipes.processors.textmining.flink.processor.wordcount.WordCountController;
+import org.apache.streampipes.processors.transformation.flink.processor.boilerplate.BoilerplateController;
+import org.apache.streampipes.processors.transformation.flink.processor.converter.FieldConverterController;
+import org.apache.streampipes.processors.transformation.flink.processor.hasher.FieldHasherController;
+import org.apache.streampipes.processors.transformation.flink.processor.mapper.FieldMapperController;
+import org.apache.streampipes.processors.transformation.flink.processor.measurementUnitConverter.MeasurementUnitConverterController;
+import org.apache.streampipes.processors.transformation.flink.processor.rename.FieldRenamerController;
+import org.apache.streampipes.sinks.databases.flink.elasticsearch.ElasticSearchController;
+
+
+public class AllFlinkPipelineElementsInit extends StandaloneModelSubmitter {
+
+ public static void main(String[] args) {
+ DeclarersSingleton.getInstance()
+ // streampipes-processors-aggregation-flink
+ .add(new AggregationController())
+ .add(new CountController())
+ .add(new EventRateController())
+ .add(new EventCountController())
+ // streampipes-processors-enricher-flink
+ .add(new TimestampController())
+ .add(new MathOpController())
+ .add(new StaticMathOpController())
+ .add(new UrlDereferencingController())
+ .add(new TrigonometryController())
+ // streampipes-processors-geo-flink
+ .add(new SpatialGridEnrichmentController())
+ // streampipes-processors-pattern-detection-flink
+ .add(new PeakDetectionController())
+ .add(new SequenceController())
+ .add(new AbsenceController())
+ .add(new AndController())
+ // streampipes-processors-statistics-flink
+ .add(new StatisticsSummaryController())
+ .add(new StatisticsSummaryControllerWindow())
+ // streampipes-processors-text-mining-flink
+ .add(new WordCountController())
+ // streampipes-processors-transformation-flink
+ .add(new FieldConverterController())
+ .add(new FieldHasherController())
+ .add(new FieldMapperController())
+ .add(new MeasurementUnitConverterController())
+ .add(new FieldRenamerController())
+ .add(new BoilerplateController())
+ // streampipes-sinks-databases-flink
+ .add(new ElasticSearchController());
+
+
+ DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory());
+
+ DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory());
+
+ new AllFlinkPipelineElementsInit().init(Config.INSTANCE);
+ }
+}
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
new file mode 100644
index 0000000..c4618bb
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/Config.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink.config;
+
+import org.apache.streampipes.config.SpConfig;
+import org.apache.streampipes.container.model.PeConfig;
+
+public enum Config implements PeConfig {
+ INSTANCE;
+
+ private final SpConfig config;
+ public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
+ private final static String SERVICE_ID = "pe/org.apache.streampipes.processors.all.flink";
+ private final static String SERVICE_NAME = "Processors Flink (Bundle)";
+ private final static String SERVICE_CONTAINER_NAME = "pipeline-elements-all-flink";
+
+ Config() {
+ config = SpConfig.getSpConfig(SERVICE_ID);
+ config.register(ConfigKeys.HOST, SERVICE_CONTAINER_NAME, "Data processor host");
+ config.register(ConfigKeys.PORT, 8090, "Data processor port");
+ config.register(ConfigKeys.SERVICE_NAME, SERVICE_NAME, "Data processor service name");
+ config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
+ config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
+ config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
+ }
+
+ public String getFlinkHost() {
+ return config.getString(ConfigKeys.FLINK_HOST);
+ }
+
+ public int getFlinkPort() {
+ return config.getInteger(ConfigKeys.FLINK_PORT);
+ }
+
+ public boolean getFlinkDebug() {
+ return config.getBoolean(ConfigKeys.FLINK_DEBUG);
+ }
+
+ @Override
+ public String getHost() {
+ return config.getString(ConfigKeys.HOST);
+ }
+
+ @Override
+ public int getPort() {
+ return config.getInteger(ConfigKeys.PORT);
+ }
+
+ @Override
+ public String getId() {
+ return SERVICE_ID;
+ }
+
+ @Override
+ public String getName() {
+ return config.getString(ConfigKeys.SERVICE_NAME);
+ }
+
+
+}
diff --git a/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
new file mode 100644
index 0000000..cacc5ec
--- /dev/null
+++ b/streampipes-pipeline-elements-all-flink/src/main/java/org/apache/streampipes/pe/flink/config/ConfigKeys.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.flink.config;
+
+public class ConfigKeys {
+ final static String HOST = "SP_HOST";
+ final static String PORT = "SP_PORT";
+ final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ final static String FLINK_HOST = "SP_FLINK_HOST";
+ final static String FLINK_PORT = "SP_FLINK_PORT";
+ final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
+}