You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ob...@apache.org on 2022/02/13 23:12:09 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-474] Update Apache Flink archetype
This is an automated email from the ASF dual-hosted git repository.
obermeier pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 1c28789 [STREAMPIPES-474] Update Apache Flink archetype
1c28789 is described below
commit 1c28789312c6fd88bdc51fbaba16e3c8afb6fbbc
Author: Stefan Obermeier <ob...@apache.org>
AuthorDate: Sun Feb 13 23:55:40 2022 +0100
[STREAMPIPES-474] Update Apache Flink archetype
---
.../src/main/resources/archetype-resources/pom.xml | 5 ++
.../archetype-resources/src/main/java/Init.java | 44 ++++++++-----
.../src/main/java/config/Config.java | 77 ----------------------
.../src/main/java/config/ConfigKeys.java | 14 ++--
.../__classNamePrefix__Controller.java | 6 +-
.../__classNamePrefix__Parameters.java | 2 +
.../__classNamePrefix__Program.java | 46 ++++++++-----
7 files changed, 78 insertions(+), 116 deletions(-)
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
index 1f8268a..1dd8781 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -81,6 +81,11 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-sdk-bundle</artifactId>
+ <version>${sp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk</artifactId>
<version>${sp.version}</version>
</dependency>
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index fe3fd38..8b05fdf 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -23,8 +23,11 @@ package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
+
+import ${package}.config.ConfigKeys;
-import ${package}.config.Config;
import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
@@ -38,20 +41,31 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
public class Init extends StandaloneModelSubmitter {
public static void main(String[] args) throws Exception {
- DeclarersSingleton.getInstance()
- .add(new ${classNamePrefix}Controller());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
-
- DeclarersSingleton.getInstance().registerProtocols(
- new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
+ new Init().init();
+ }
- new Init().init(Config.INSTANCE);
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("${package}",
+ "Apache Flink processor",
+ "",
+ 8090)
+ .registerPipelineElement(new ${classNamePrefix}Controller())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
+ new SpKafkaProtocolFactory(),
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+ .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+ .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+ .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
+ .addConfig(ConfigKeys.SERVICE_NAME, "sp fft stream analytics metrics", "Data processor service name")
+ .addConfig(ConfigKeys.HOST, "${artifactId}", "Data processor host")
+ .build();
}
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
deleted file mode 100644
index a73a975..0000000
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#set( $symbol_pound = '#' )
-#set( $symbol_dollar = '$' )
-#set( $symbol_escape = '\' )
-#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
-
-package ${package}.config;
-
-import org.apache.streampipes.config.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum Config implements PeConfig {
- INSTANCE;
-
- private SpConfig config;
- public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
- private final static String SERVICE_ID = "pe/${package}.processor.flink";
-
- Config() {
- config = SpConfig.getSpConfig(SERVICE_ID);
- config.register(ConfigKeys.HOST, "${artifactId}", "Data processor host");
- config.register(ConfigKeys.PORT, 8090, "Data processor port");
- config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data processor service name");
- config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
- config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
- config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
- }
-
- public String getFlinkHost() {
- return config.getString(ConfigKeys.FLINK_HOST);
- }
-
- public int getFlinkPort() {
- return config.getInteger(ConfigKeys.FLINK_PORT);
- }
-
- public boolean getFlinkDebug() {
- return config.getBoolean(ConfigKeys.FLINK_DEBUG);
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
- }
-
- @Override
- public String getId() {
- return SERVICE_ID;
- }
-
- @Override
- public String getName() {
- return config.getString(ConfigKeys.SERVICE_NAME);
- }
-
-}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index e3cc6f0..0dc3eb9 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -22,10 +22,12 @@
package ${package}.config;
public class ConfigKeys {
- final static String HOST = "SP_HOST";
- final static String PORT = "SP_PORT";
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
- final static String FLINK_HOST = "SP_FLINK_HOST";
- final static String FLINK_PORT = "SP_FLINK_PORT";
- final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
+ public static final String HOST = "SP_HOST";
+ public static final String PORT = "SP_PORT";
+ public static final String SERVICE_NAME = "SP_SERVICE_NAME";
+ public static final String FLINK_HOST = "SP_FLINK_HOST";
+ public static final String FLINK_PORT = "SP_FLINK_PORT";
+ public static final String FLINK_DEBUG = "SP_FLINK_DEBUG";
+ public static final String DEBUG = "SP_FLINK_DEBUG";
+ public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index 7efd741..981a6a8 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -21,10 +21,10 @@
package ${package}.pe.processor.${packageName};
-import ${package}.config.Config;
-
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -71,6 +71,6 @@ public class ${classNamePrefix}Controller extends FlinkDataProcessorDeclarer<${c
String exampleText = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleText);
- return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug());
+ return new ${classNamePrefix}Program(params, configExtractor, streamPipesClient);
}
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
index 8943e29..d86ea35 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
@@ -27,6 +27,8 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
public class ${classNamePrefix}Parameters extends EventProcessorBindingParams {
private String exampleText;
+ private static final long serialVersionUID = 1L;
+
public ${classNamePrefix}Parameters(DataProcessorInvocation graph, String exampleText) {
super(graph);
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
index 63f8477..c2ae8fe 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
@@ -21,12 +21,23 @@
package ${package}.pe.processor.${packageName};
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
+import org.myorga.config.ConfigKeys;
+import org.myorga.pe.processor.example.MyFlinkProcessor;
+import org.myorga.pe.processor.example.MyFlinkProcessorParameters;
+import org.myorga.pe.processor.example.MyFlinkProcessorProgram;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
import java.io.Serializable;
@@ -35,22 +46,27 @@ public class ${classNamePrefix}Program extends FlinkDataProcessorRuntime<${class
private static final long serialVersionUID = 1L;
private final ${classNamePrefix}Parameters params;
- public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) {
- super(params, debug);
- this.params = params;
- }
+ public ${classNamePrefix}Program(${classNamePrefix}Parameters params,
+ ConfigExtractor configExtractor,
+ StreamPipesClient streamPipesClient)
+ {
+ super(params, configExtractor, streamPipesClient);
+ this.params = params;
+ }
- @Override
- protected FlinkDeploymentConfig getDeploymentConfig() {
- return new FlinkDeploymentConfig(Config.JAR_FILE,
- Config.INSTANCE.getFlinkHost(), Config.INSTANCE.getFlinkPort());
- }
@Override
- protected DataStream<Event> getApplicationLogic(
- DataStream<Event>... messageStream) {
+ protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+ SpConfig config = configExtractor.getConfig();
+ return new FlinkDeploymentConfig(config.getString(ConfigKeys.FLINK_JAR_FILE_LOC),
+ config.getString(ConfigKeys.FLINK_HOST),
+ config.getInteger(ConfigKeys.FLINK_PORT),
+ config.getBoolean(ConfigKeys.DEBUG));
+ }
+ @Override
+ protected DataStream<Event> getApplicationLogic(DataStream<Event>... dataStreams){
- return messageStream[0]
- .flatMap(new ${classNamePrefix}(this.params.getExampleText()));
- }
+ return dataStreams[0]
+ .flatMap(new ${classNamePrefix}(this.params.getExampleText()));
+ }
}