You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ob...@apache.org on 2022/02/13 23:12:09 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-474] Update Apache Flink archetype

This is an automated email from the ASF dual-hosted git repository.

obermeier pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1c28789  [STREAMPIPES-474] Update Apache Flink archetype
1c28789 is described below

commit 1c28789312c6fd88bdc51fbaba16e3c8afb6fbbc
Author: Stefan Obermeier <ob...@apache.org>
AuthorDate: Sun Feb 13 23:55:40 2022 +0100

    [STREAMPIPES-474] Update Apache Flink archetype
---
 .../src/main/resources/archetype-resources/pom.xml |  5 ++
 .../archetype-resources/src/main/java/Init.java    | 44 ++++++++-----
 .../src/main/java/config/Config.java               | 77 ----------------------
 .../src/main/java/config/ConfigKeys.java           | 14 ++--
 .../__classNamePrefix__Controller.java             |  6 +-
 .../__classNamePrefix__Parameters.java             |  2 +
 .../__classNamePrefix__Program.java                | 46 ++++++++-----
 7 files changed, 78 insertions(+), 116 deletions(-)

diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
index 1f8268a..1dd8781 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -81,6 +81,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-sdk-bundle</artifactId>
+            <version>${sp.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-sdk</artifactId>
             <version>${sp.version}</version>
         </dependency>
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index fe3fd38..8b05fdf 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -23,8 +23,11 @@ package ${package};
 
 import org.apache.streampipes.container.init.DeclarersSingleton;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
+import org.apache.streampipes.container.model.SpServiceDefinition;
+import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
+
+import ${package}.config.ConfigKeys;
 
-import ${package}.config.Config;
 import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
 
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
@@ -38,20 +41,31 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 public class Init extends StandaloneModelSubmitter {
 
   public static void main(String[] args) throws Exception {
-    DeclarersSingleton.getInstance()
-            .add(new ${classNamePrefix}Controller());
-
-    DeclarersSingleton.getInstance().registerDataFormats(
-            new JsonDataFormatFactory(),
-            new CborDataFormatFactory(),
-            new SmileDataFormatFactory(),
-            new FstDataFormatFactory());
-
-    DeclarersSingleton.getInstance().registerProtocols(
-            new SpKafkaProtocolFactory(),
-            new SpMqttProtocolFactory(),
-            new SpJmsProtocolFactory());
+	  new Init().init();
+  }
 
-    new Init().init(Config.INSTANCE);
+  @Override
+  public SpServiceDefinition provideServiceDefinition() {
+    return SpServiceDefinitionBuilder.create("${package}",
+              "Apache Flink processor",
+              "",
+              8090)
+	  .registerPipelineElement(new ${classNamePrefix}Controller())
+      .registerMessagingFormats(
+              new JsonDataFormatFactory(),
+              new CborDataFormatFactory(),
+              new SmileDataFormatFactory(),
+              new FstDataFormatFactory())
+      .registerMessagingProtocols(
+              new SpKafkaProtocolFactory(),
+              new SpJmsProtocolFactory(),
+              new SpMqttProtocolFactory())
+      .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
+      .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+      .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
+      .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location")
+      .addConfig(ConfigKeys.SERVICE_NAME, "sp fft stream analytics metrics", "Data processor service name")
+      .addConfig(ConfigKeys.HOST, "${artifactId}", "Data processor host")
+      .build();
   }
 }
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
deleted file mode 100644
index a73a975..0000000
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#set( $symbol_pound = '#' )
-#set( $symbol_dollar = '$' )
-#set( $symbol_escape = '\' )
-#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
-
-package ${package}.config;
-
-import org.apache.streampipes.config.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum Config implements PeConfig {
-  INSTANCE;
-
-  private SpConfig config;
-  public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-  private final static String SERVICE_ID = "pe/${package}.processor.flink";
-
-  Config() {
-    config = SpConfig.getSpConfig(SERVICE_ID);
-    config.register(ConfigKeys.HOST, "${artifactId}", "Data processor host");
-    config.register(ConfigKeys.PORT, 8090, "Data processor port");
-    config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data processor service name");
-    config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
-    config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
-    config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-  }
-
-  public String getFlinkHost() {
-    return config.getString(ConfigKeys.FLINK_HOST);
-  }
-
-  public int getFlinkPort() {
-    return config.getInteger(ConfigKeys.FLINK_PORT);
-  }
-
-  public boolean getFlinkDebug() {
-    return config.getBoolean(ConfigKeys.FLINK_DEBUG);
-  }
-
-  @Override
-  public String getHost() {
-    return config.getString(ConfigKeys.HOST);
-  }
-
-  @Override
-  public int getPort() {
-    return config.getInteger(ConfigKeys.PORT);
-  }
-
-  @Override
-  public String getId() {
-    return SERVICE_ID;
-  }
-
-  @Override
-  public String getName() {
-    return config.getString(ConfigKeys.SERVICE_NAME);
-  }
-
-}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index e3cc6f0..0dc3eb9 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -22,10 +22,12 @@
 package ${package}.config;
 
 public class ConfigKeys {
-  final static String HOST = "SP_HOST";
-  final static String PORT = "SP_PORT";
-  final static String SERVICE_NAME = "SP_SERVICE_NAME";
-  final static String FLINK_HOST = "SP_FLINK_HOST";
-  final static String FLINK_PORT = "SP_FLINK_PORT";
-  final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
+  public static final String HOST = "SP_HOST";
+  public static final String PORT = "SP_PORT";
+  public static final String SERVICE_NAME = "SP_SERVICE_NAME";
+  public static final String FLINK_HOST = "SP_FLINK_HOST";
+  public static final String FLINK_PORT = "SP_FLINK_PORT";
+  public static final String FLINK_DEBUG = "SP_FLINK_DEBUG";
+  public static final String DEBUG = "SP_FLINK_DEBUG";
+  public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
 }
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index 7efd741..981a6a8 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -21,10 +21,10 @@
 
 package ${package}.pe.processor.${packageName};
 
-import ${package}.config.Config;
-
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -71,6 +71,6 @@ public class ${classNamePrefix}Controller extends FlinkDataProcessorDeclarer<${c
 		String exampleText = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
 		${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleText);
 
-		return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug());
+		return new ${classNamePrefix}Program(params, configExtractor, streamPipesClient);
 	}
 }
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
index 8943e29..d86ea35 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
@@ -27,6 +27,8 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 public class ${classNamePrefix}Parameters extends EventProcessorBindingParams {
 
   private String exampleText;
+  private static final long serialVersionUID = 1L;
+
 
   public ${classNamePrefix}Parameters(DataProcessorInvocation graph, String exampleText) {
     super(graph);
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
index 63f8477..c2ae8fe 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
@@ -21,12 +21,23 @@
 
 package ${package}.pe.processor.${packageName};
 
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
+import org.myorga.config.ConfigKeys;
+import org.myorga.pe.processor.example.MyFlinkProcessor;
+import org.myorga.pe.processor.example.MyFlinkProcessorParameters;
+import org.myorga.pe.processor.example.MyFlinkProcessorProgram;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
 
 import java.io.Serializable;
 
@@ -35,22 +46,27 @@ public class ${classNamePrefix}Program extends FlinkDataProcessorRuntime<${class
   private static final long serialVersionUID = 1L;
   private final ${classNamePrefix}Parameters params;
 
-  public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) {
-    super(params, debug);
-    this.params = params;
-  }
+  public ${classNamePrefix}Program(${classNamePrefix}Parameters params,
+		  ConfigExtractor configExtractor,
+		  StreamPipesClient streamPipesClient)
+	{
+	  super(params, configExtractor, streamPipesClient);
+	  this.params = params;
+	}
 
-  @Override
-  protected FlinkDeploymentConfig getDeploymentConfig() {
-    return new FlinkDeploymentConfig(Config.JAR_FILE,
-            Config.INSTANCE.getFlinkHost(), Config.INSTANCE.getFlinkPort());
-  }
 
   @Override
-  protected DataStream<Event> getApplicationLogic(
-        DataStream<Event>... messageStream) {
+    protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) {
+		SpConfig config = configExtractor.getConfig();
+		return new FlinkDeploymentConfig(config.getString(ConfigKeys.FLINK_JAR_FILE_LOC),
+				config.getString(ConfigKeys.FLINK_HOST),
+				config.getInteger(ConfigKeys.FLINK_PORT),
+				config.getBoolean(ConfigKeys.DEBUG));
+	}
+	@Override
+    protected DataStream<Event> getApplicationLogic(DataStream<Event>... dataStreams){
 
-    return messageStream[0]
-        .flatMap(new ${classNamePrefix}(this.params.getExampleText()));
-  }
+		return dataStreams[0]
+				.flatMap(new ${classNamePrefix}(this.params.getExampleText()));
+	}
 }