You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/09/05 07:42:07 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-221]
Update maven archetypes for pipeline elements
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7f875f3 [STREAMPIPES-221] Update maven archetypes for pipeline elements
7f875f3 is described below
commit 7f875f38000b39b8e458f41aed4f20229616a646
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Sat Sep 5 09:41:40 2020 +0200
[STREAMPIPES-221] Update maven archetypes for pipeline elements
---
.../META-INF/maven/archetype-metadata.xml | 20 ++----
.../src/main/resources/archetype-resources/pom.xml | 4 +-
.../src/main/java/{main => }/Init.java | 12 ++--
.../src/main/java/config/Config.java | 56 +++++-----------
.../src/main/java/config/ConfigKeys.java | 7 +-
.../__packageName__/__classNamePrefix__.java | 13 ++--
.../__classNamePrefix__Controller.java | 23 +++----
.../__classNamePrefix__Parameters.java | 2 -
.../__classNamePrefix__Program.java | 9 ++-
.../documentation.md | 20 +++---
.../icon.png | Bin 4836 -> 13563 bytes
.../strings.en | 2 +-
.../META-INF/maven/archetype-metadata.xml | 19 +-----
.../src/main/resources/archetype-resources/pom.xml | 4 +-
.../src/main/java/{main => }/Init.java | 16 ++---
.../src/main/java/config/Config.java | 41 +++---------
.../src/main/java/config/ConfigKeys.java | 4 +-
.../__packageName__/__classNamePrefix__.java | 20 +++---
.../__classNamePrefix__Controller.java | 15 +++--
.../__classNamePrefix__Parameters.java | 3 +-
.../documentation.md | 20 +++---
.../icon.png | Bin 4836 -> 13563 bytes
.../strings.en | 2 +-
.../META-INF/maven/archetype-metadata.xml | 15 ++---
.../src/main/resources/archetype-resources/pom.xml | 4 +-
.../archetype-resources/src/main/java}/Init.java | 24 +++----
.../src/main/java/config/Config.java | 55 +++++----------
.../src/main/java/config/ConfigKeys.java | 6 +-
.../__classNamePrefix__Controller.java | 20 +++---
.../__classNamePrefix__Parameters.java | 24 +++++--
.../__classNamePrefix__Program.java | 5 ++
.../documentation.md | 19 +++---
.../__package__.pe.sink.__packageName__/icon.png | Bin 4836 -> 13563 bytes
.../__package__.pe.sink.__packageName__/strings.en | 12 +++-
.../META-INF/maven/archetype-metadata.xml | 17 ++---
.../src/main/resources/archetype-resources/pom.xml | 4 +-
.../archetype-resources/src/main/java}/Init.java | 27 +++-----
.../src/main/java/config/Config.java | 41 +++---------
.../src/main/java/config/ConfigKeys.java | 2 -
.../sink/__packageName__/__classNamePrefix__.java | 23 ++++---
.../__classNamePrefix__Controller.java | 19 +++---
.../__classNamePrefix__Parameters.java | 24 +++++--
.../documentation.md | 19 +++---
.../__package__.pe.sink.__packageName__/icon.png | Bin 4836 -> 13563 bytes
.../__package__.pe.sink.__packageName__/strings.en | 12 +++-
.../META-INF/maven/archetype-metadata.xml | 13 ++--
.../main/resources/archetype-resources/Dockerfile | 7 +-
.../resources/archetype-resources/development/env | 3 +-
.../src/main/resources/archetype-resources/pom.xml | 4 +-
.../src/main/java/{main => }/Init.java | 5 +-
.../src/main/java/config/Config.java | 40 ++++-------
.../src/main/java/config/ConfigKeys.java | 4 +-
.../__packageName__/__classNamePrefix__Stream.java | 52 ---------------
.../DataSource.java | 4 +-
.../__classNamePrefix__Stream.java | 74 +++++++++++++++++++++
.../documentation.md | 20 ++----
.../__package__.pe.source.__packageName__/icon.png | Bin 0 -> 13563 bytes
.../strings.en | 2 +
58 files changed, 392 insertions(+), 520 deletions(-)
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
index 667d6e5..8b8a694 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -36,15 +36,9 @@
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory></directory>
- <includes>
- <include>Dockerfile</include>
- </includes>
- </fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
<directory/>
<includes>
- <include>deployment/docker-compose.yml</include>
+ <include>Dockerfile</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
@@ -54,16 +48,10 @@
<include>**/*.md</include>
</includes>
</fileSet>
- <!--<binaries>-->
- <!--<directory>src/main/resources</directory>-->
- <!--<includes>-->
- <!--<include>**/*.png</include>-->
- <!--</includes>-->
- <!--</binaries>-->
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <fileSet filtered="false" packaged="false" >
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/system</include>
+ <include>**/*.png</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
index a443207..92a4576 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -25,7 +25,7 @@
<version>${version}</version>
<properties>
- <sp.version>0.67.0-SNAPSHOT</sp.version>
+ <sp.version>0.68.0-SNAPSHOT</sp.version>
</properties>
<dependencies>
@@ -173,7 +173,7 @@
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>${package}.main.Init</mainClass>
+ <mainClass>${package}.Init</mainClass>
</transformer>
</transformers>
<filters>
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
similarity index 90%
rename from archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
rename to archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index 405c926..9b9c928 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -15,12 +15,11 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
-package ${package}.main;
+package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
@@ -41,17 +40,16 @@ public class Init extends StandaloneModelSubmitter {
DeclarersSingleton.getInstance()
.add(new ${classNamePrefix}Controller());
- DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+ DeclarersSingleton.getInstance().registerDataFormats(
+ new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory());
- DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+ DeclarersSingleton.getInstance().registerProtocols(
+ new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory());
new Init().init(Config.INSTANCE);
-
}
-
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
index 8f7c9f2..a73a975 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
package ${package}.config;
@@ -26,39 +26,20 @@ import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.container.model.PeConfig;
public enum Config implements PeConfig {
-
INSTANCE;
private SpConfig config;
public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
- private final static String SERVICE_ID = "pe/${package}";
-
+ private final static String SERVICE_ID = "pe/${package}.processor.flink";
Config() {
config = SpConfig.getSpConfig(SERVICE_ID);
-
- config.register(ConfigKeys.HOST, "${artifactId}", "Hostname for the pe mixed flink component");
- config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
- config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
- config.register(ConfigKeys.FLINK_PORT, 6123, "Port for the flink cluster");
- config.register(ConfigKeys.ICON_HOST, "backend", "Hostname for the icon host");
- config.register(ConfigKeys.ICON_PORT, 80, "Port for the icons in nginx");
-
- config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
- config.register(ConfigKeys.SERVICE_NAME, "${packageName}", "The name of the service");
-
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
+ config.register(ConfigKeys.HOST, "${artifactId}", "Data processor host");
+ config.register(ConfigKeys.PORT, 8090, "Data processor port");
+ config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data processor service name");
+ config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
+ config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
+ config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
}
public String getFlinkHost() {
@@ -69,23 +50,18 @@ public enum Config implements PeConfig {
return config.getInteger(ConfigKeys.FLINK_PORT);
}
- public static final String iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" +
- Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
-
- public static final String getIconUrl(String pictureName) {
- return iconBaseUrl + "/" + pictureName + ".png";
- }
-
- public String getIconHost() {
- return config.getString(ConfigKeys.ICON_HOST);
+ public boolean getFlinkDebug() {
+ return config.getBoolean(ConfigKeys.FLINK_DEBUG);
}
- public int getIconPort() {
- return config.getInteger(ConfigKeys.ICON_PORT);
+ @Override
+ public String getHost() {
+ return config.getString(ConfigKeys.HOST);
}
- public boolean getDebug() {
- return config.getBoolean(ConfigKeys.DEBUG);
+ @Override
+ public int getPort() {
+ return config.getInteger(ConfigKeys.PORT);
}
@Override
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index 291de17..e3cc6f0 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
@@ -25,10 +24,8 @@ package ${package}.config;
public class ConfigKeys {
final static String HOST = "SP_HOST";
final static String PORT = "SP_PORT";
+ final static String SERVICE_NAME = "SP_SERVICE_NAME";
final static String FLINK_HOST = "SP_FLINK_HOST";
final static String FLINK_PORT = "SP_FLINK_PORT";
- final static String ICON_HOST = "SP_ICON_HOST";
- final static String ICON_PORT = "SP_ICON_PORT";
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
- final static String DEBUG = "SP_FLINK_DEBUG";
+ final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
index 8c78329..f61fc83 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
@@ -29,9 +28,15 @@ import org.apache.streampipes.model.runtime.Event;
public class ${classNamePrefix} implements FlatMapFunction<Event, Event> {
- @Override
- public void flatMap(Event in, Collector<Event> out) throws Exception {
+ private String exampleText;
- out.collect(in);
+ public ${classNamePrefix}(String exampleText) {
+ this.exampleText = exampleText;
+ }
+
+ @Override
+ public void flatMap(Event event, Collector<Event> out) {
+ event.addField("appendedText", exampleText);
+ out.collect(event);
}
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index d09f894..d233f97 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
@@ -24,6 +23,8 @@ package ${package}.pe.processor.${packageName};
import ${package}.config.Config;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,15 +34,12 @@ import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.SupportedFormats;
-import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
-public class ${classNamePrefix}Controller extends
- FlinkDataProcessorDeclarer<${classNamePrefix}Parameters> {
+public class ${classNamePrefix}Controller extends FlinkDataProcessorDeclarer<${classNamePrefix}Parameters> {
private static final String EXAMPLE_KEY = "example-key";
@@ -55,10 +53,11 @@ public class ${classNamePrefix}Controller extends
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
- .supportedFormats(SupportedFormats.jsonFormat())
- .supportedProtocols(SupportedProtocols.kafka())
- .outputStrategy(OutputStrategies.keep())
.requiredTextParameter(Labels.withId(EXAMPLE_KEY))
+ .outputStrategy(OutputStrategies.append(
+ PrimitivePropertyBuilder.create(
+ Datatypes.String, "appendedText")
+ .build()))
.build();
}
@@ -66,11 +65,9 @@ public class ${classNamePrefix}Controller extends
public FlinkDataProcessorRuntime<${classNamePrefix}Parameters> getRuntime(DataProcessorInvocation
graph, ProcessingElementParameterExtractor extractor) {
- String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
-
- ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleString);
+ String exampleText = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
+ ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleText);
- return new ${classNamePrefix}Program(params, Config.INSTANCE.getDebug());
+ return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug());
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
index 8881c62..8943e29 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
@@ -37,5 +36,4 @@ public class ${classNamePrefix}Parameters extends EventProcessorBindingParams {
public String getExampleText() {
return exampleText;
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
index b0838ca..63f8477 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
@@ -31,14 +30,14 @@ import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
import java.io.Serializable;
-public class ${classNamePrefix}Program extends
- FlinkDataProcessorRuntime<${classNamePrefix}Parameters>
-implements Serializable {
+public class ${classNamePrefix}Program extends FlinkDataProcessorRuntime<${classNamePrefix}Parameters> implements Serializable {
private static final long serialVersionUID = 1L;
+ private final ${classNamePrefix}Parameters params;
public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) {
super(params, debug);
+ this.params = params;
}
@Override
@@ -52,6 +51,6 @@ implements Serializable {
DataStream<Event>... messageStream) {
return messageStream[0]
- .flatMap(new ${classNamePrefix}());
+ .flatMap(new ${classNamePrefix}(this.params.getExampleText()));
}
}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
index d91bf67..1e0e266 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
@@ -15,8 +15,9 @@
~ limitations under the License.
~
-->
+#set( $double_pound = '##' )
-## ${classNamePrefix}
+${double_pound} ${classNamePrefix}
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -24,19 +25,18 @@
***
-## Description
-
-Describe your new processor here!
+${double_pound} Description
+Describe your new data processor here!
***
-## Required input
-What are the input requirements of your processor?
+${double_pound} Required input
+What are the input requirements of your data processor?
***
-## Configuration
-What are the configurations a user has to provide
+${double_pound} Configuration
+What are the configurations a user has to provide?
-## Output
-How do the events your processor emits look like.
+${double_pound} Output
+How do the events your data processor emits look like?
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png
index 5666a68..05d4b1d 100644
Binary files a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png and b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png differ
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
index 9753ca1..d23ce26 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
+++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
@@ -1,5 +1,5 @@
${package}.pe.processor.${packageName}.title=${classNamePrefix}
-${package}.pe.processor.${packageName}.description=Description of processor
+${package}.pe.processor.${packageName}.description=Description of data processor
example-key.title=Example Text Parameter
example-key.description=Example Text Parameter Description
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
index 5e6d936..8b8a694 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -42,29 +42,16 @@
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
- <includes>
- <include>deployment/docker-compose.yml</include>
- </includes>
- </fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
<directory>src/main/resources</directory>
<includes>
- <!--<include>**/*.png</include>-->
<include>**/*.en</include>
<include>**/*.md</include>
</includes>
</fileSet>
- <!--<binaries filtered="true" packaged="false" >-->
- <!--<outputDirectory>src/main/resources</outputDirectory>-->
- <!--<includes>-->
- <!--<include>**/icon.png</include>-->
- <!--</includes>-->
- <!--</binaries>-->
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <fileSet filtered="false" packaged="false" >
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/system</include>
+ <include>**/*.png</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml
index b29497b..88bce5b 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/pom.xml
@@ -25,7 +25,7 @@
<version>${version}</version>
<properties>
- <sp.version>0.67.0-SNAPSHOT</sp.version>
+ <sp.version>0.68.0-SNAPSHOT</sp.version>
</properties>
<dependencies>
@@ -98,7 +98,7 @@
<goal>repackage</goal>
</goals>
<configuration>
- <mainClass>${package}.main.Init</mainClass>
+ <mainClass>${package}.Init</mainClass>
</configuration>
</execution>
</executions>
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
similarity index 82%
rename from archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java
rename to archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
index b35b0fe..654ee35 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -19,7 +19,7 @@
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
-package ${package}.main;
+package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
@@ -35,24 +35,20 @@ import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
public class Init extends StandaloneModelSubmitter {
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
DeclarersSingleton.getInstance()
.add(new ${classNamePrefix}Controller());
- DeclarersSingleton.getInstance().setPort(Config.INSTANCE.getPort());
- DeclarersSingleton.getInstance().setHostName(Config.INSTANCE.getHost());
-
- DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+ DeclarersSingleton.getInstance().registerDataFormats(
+ new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory());
- DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+ DeclarersSingleton.getInstance().registerProtocols(
+ new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory());
new Init().init(Config.INSTANCE);
-
}
-
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
index 84e78b0..c5f3fcd 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -19,6 +19,7 @@
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
package ${package}.config;
import org.apache.streampipes.config.SpConfig;
@@ -27,53 +28,28 @@ import org.apache.streampipes.container.model.PeConfig;
import static ${package}.config.ConfigKeys.*;
public enum Config implements PeConfig {
-
INSTANCE;
private SpConfig config;
-
- public final static String serverUrl;
- public final static String iconBaseUrl;
-
- private final static String SERVICE_ID= "pe/${package}";
+ private final static String SERVICE_ID= "pe/${package}.processor.jvm";
Config() {
- config = SpConfig.getSpConfig("pe/${package}");
-
- config.register(HOST, "${artifactId}", "Hostname for the pe sinks");
- config.register(PORT, 8090, "Port for the pe sinks");
-
- config.register(ICON_HOST, "backend", "Hostname for the icon host");
- config.register(ICON_PORT, 80, "Port for the icons in nginx");
-
- config.register(SERVICE_NAME, "${packageName}", "The name of the service");
- }
-
- static {
- serverUrl = Config.INSTANCE.getHost() + ":" + Config.INSTANCE.getPort();
- iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" + Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
- }
-
- public static final String getIconUrl(String pictureName) {
- return iconBaseUrl + "/" + pictureName + ".png";
+ config = SpConfig.getSpConfig(SERVICE_ID);
+ config.register(HOST, "${artifactId}", "Data processor host");
+ config.register(PORT, 8090, "Data processor port");
+ config.register(SERVICE_NAME, "${svc_name}", "Data processor service name");
}
+ @Override
public String getHost() {
return config.getString(HOST);
}
+ @Override
public int getPort() {
return config.getInteger(PORT);
}
- public String getIconHost() {
- return config.getString(ICON_HOST);
- }
-
- public int getIconPort() {
- return config.getInteger(ICON_PORT);
- }
-
@Override
public String getId() {
return SERVICE_ID;
@@ -83,5 +59,4 @@ public enum Config implements PeConfig {
public String getName() {
return config.getString(SERVICE_NAME);
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index 8e9477a..dc36c28 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -15,16 +15,14 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+
package ${package}.config;
public class ConfigKeys {
final static String HOST = "SP_HOST";
final static String PORT = "SP_PORT";
- final static String ICON_HOST = "SP_ICON_HOST";
- final static String ICON_PORT = "SP_ICON_PORT";
final static String SERVICE_NAME = "SP_SERVICE_NAME";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
index fca5c45..a48248e 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__.java
@@ -15,39 +15,35 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
package ${package}.pe.processor.${packageName};
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
-import org.slf4j.Logger;
-
-public class ${classNamePrefix} implements
- EventProcessor<${classNamePrefix}Parameters> {
+public class ${classNamePrefix} implements EventProcessor<${classNamePrefix}Parameters> {
- private static Logger LOG;
+ private String exampleText;
@Override
public void onInvocation(${classNamePrefix}Parameters parameters,
- SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
+ SpOutputCollector out, EventProcessorRuntimeContext ctx) {
+ this.exampleText = parameters.getExampleText();
}
@Override
- public void onEvent(Event event, SpOutputCollector out) throws SpRuntimeException {
-
+ public void onEvent(Event event, SpOutputCollector out) {
+ event.addField("appendedText", exampleText);
+ out.collect(event);
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onDetach() {
}
}
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index 8126088..927d23c 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -24,16 +24,16 @@ package ${package}.pe.processor.${packageName};
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.SupportedFormats;
-import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
@@ -52,7 +52,10 @@ public class ${classNamePrefix}Controller extends StandaloneEventProcessingDecla
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredTextParameter(Labels.withId(EXAMPLE_KEY))
- .outputStrategy(OutputStrategies.keep())
+ .outputStrategy(OutputStrategies.append(
+ PrimitivePropertyBuilder.create(
+ Datatypes.String, "appendedText")
+ .build()))
.build();
}
@@ -60,11 +63,9 @@ public class ${classNamePrefix}Controller extends StandaloneEventProcessingDecla
public ConfiguredEventProcessor<${classNamePrefix}Parameters> onInvocation
(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
-
- ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleString);
+ String exampleText = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
+ ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleText);
return new ConfiguredEventProcessor<>(params, ${classNamePrefix}::new);
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
index 732f6a3..8943e29 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Parameters.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+
package ${package}.pe.processor.${packageName};
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -36,5 +36,4 @@ public class ${classNamePrefix}Parameters extends EventProcessorBindingParams {
public String getExampleText() {
return exampleText;
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
index d91bf67..1e0e266 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
@@ -15,8 +15,9 @@
~ limitations under the License.
~
-->
+#set( $double_pound = '##' )
-## ${classNamePrefix}
+${double_pound} ${classNamePrefix}
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -24,19 +25,18 @@
***
-## Description
-
-Describe your new processor here!
+${double_pound} Description
+Describe your new data processor here!
***
-## Required input
-What are the input requirements of your processor?
+${double_pound} Required input
+What are the input requirements of your data processor?
***
-## Configuration
-What are the configurations a user has to provide
+${double_pound} Configuration
+What are the configurations a user has to provide?
-## Output
-How do the events your processor emits look like.
+${double_pound} Output
+How do the events your data processor emits look like?
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png
index 5666a68..05d4b1d 100644
Binary files a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png and b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/icon.png differ
diff --git a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
index 9753ca1..d23ce26 100644
--- a/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
+++ b/archetypes/streampipes-archetype-pe-processors-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/strings.en
@@ -1,5 +1,5 @@
${package}.pe.processor.${packageName}.title=${classNamePrefix}
-${package}.pe.processor.${packageName}.description=Description of processor
+${package}.pe.processor.${packageName}.description=Description of data processor
example-key.title=Example Text Parameter
example-key.description=Example Text Parameter Description
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
index a9ffe06..8b8a694 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -36,7 +36,7 @@
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory></directory>
+ <directory/>
<includes>
<include>Dockerfile</include>
</includes>
@@ -44,21 +44,14 @@
<fileSet filtered="true" packaged="false" encoding="UTF-8">
<directory>src/main/resources</directory>
<includes>
- <!--<include>**/*.png</include>-->
<include>**/*.en</include>
<include>**/*.md</include>
</includes>
</fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
- <includes>
- <include>deployment/docker-compose.yml</include>
- </includes>
- </fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <fileSet filtered="false" packaged="false" >
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/system</include>
+ <include>**/*.png</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
index a443207..92a4576 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
@@ -25,7 +25,7 @@
<version>${version}</version>
<properties>
- <sp.version>0.67.0-SNAPSHOT</sp.version>
+ <sp.version>0.68.0-SNAPSHOT</sp.version>
</properties>
<dependencies>
@@ -173,7 +173,7 @@
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>${package}.main.Init</mainClass>
+ <mainClass>${package}.Init</mainClass>
</transformer>
</transformers>
<filters>
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
similarity index 77%
rename from archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java
rename to archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index f9d44ba..1230a25 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -19,43 +19,35 @@
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
-package ${package}.main;
+package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
-
-import ${package}.config.Config;
-import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
-
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
-import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
-import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import ${package}.config.Config;
+import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
public class Init extends StandaloneModelSubmitter {
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
DeclarersSingleton.getInstance()
.add(new ${classNamePrefix}Controller());
- DeclarersSingleton.getInstance().setPort(Config.INSTANCE.getPort());
- DeclarersSingleton.getInstance().setHostName(Config.INSTANCE.getHost());
-
- DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+ DeclarersSingleton.getInstance().registerDataFormats(
+ new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory());
- DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+ DeclarersSingleton.getInstance().registerProtocols(
+ new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory());
new Init().init(Config.INSTANCE);
-
}
-
-
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
index 38f0e17..16e02a4 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -15,48 +15,30 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
package ${package}.config;
import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.container.model.PeConfig;
public enum Config implements PeConfig {
-
INSTANCE;
private SpConfig config;
public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
- private final static String SERVICE_ID = "pe/${package}";
+ private final static String SERVICE_ID = "pe/${package}.sink.flink";
Config() {
config = SpConfig.getSpConfig(SERVICE_ID);
-
- config.register(ConfigKeys.HOST, "${artifactId}", "Hostname for the pe mixed flink component");
- config.register(ConfigKeys.PORT, 8090, "Port for the pe mixed flink component");
- config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Host for the flink cluster");
- config.register(ConfigKeys.FLINK_PORT, 6123, "Port for the flink cluster");
- config.register(ConfigKeys.ICON_HOST, "backend", "Hostname for the icon host");
- config.register(ConfigKeys.ICON_PORT, 80, "Port for the icons in nginx");
-
- config.register(ConfigKeys.DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
-
- config.register(ConfigKeys.SERVICE_NAME, "${packageName}", "The name of the service");
-
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
+ config.register(ConfigKeys.HOST, "${artifactId}", "Data sink host");
+ config.register(ConfigKeys.PORT, 8090, "Data sink port");
+ config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data sink service name");
+ config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host");
+ config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
+ config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally");
}
public String getFlinkHost() {
@@ -67,23 +49,18 @@ public enum Config implements PeConfig {
return config.getInteger(ConfigKeys.FLINK_PORT);
}
- public static final String iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" +
- Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
-
- public static final String getIconUrl(String pictureName) {
- return iconBaseUrl + "/" + pictureName + ".png";
- }
-
- public String getIconHost() {
- return config.getString(ConfigKeys.ICON_HOST);
+ public boolean getFlinkDebug() {
+ return config.getBoolean(ConfigKeys.FLINK_DEBUG);
}
- public int getIconPort() {
- return config.getInteger(ConfigKeys.ICON_PORT);
+ @Override
+ public String getHost() {
+ return config.getString(ConfigKeys.HOST);
}
- public boolean getDebug() {
- return config.getBoolean(ConfigKeys.DEBUG);
+ @Override
+ public int getPort() {
+ return config.getInteger(ConfigKeys.PORT);
}
@Override
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index 2ea894b..e307f0d 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -24,10 +24,8 @@ package ${package}.config;
public class ConfigKeys {
final static String HOST = "SP_HOST";
final static String PORT = "SP_PORT";
+ final static String SERVICE_NAME = "SP_SERVICE_NAME";
final static String FLINK_HOST = "SP_FLINK_HOST";
final static String FLINK_PORT = "SP_FLINK_PORT";
- final static String ICON_HOST = "SP_ICON_HOST";
- final static String ICON_PORT = "SP_ICON_PORT";
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
- final static String DEBUG = "SP_FLINK_DEBUG";
+ final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
index 6aeb5ba..b0bf3ee 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
@@ -15,14 +15,12 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
package ${package}.pe.sink.${packageName};
import ${package}.config.Config;
-
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -40,7 +38,9 @@ import org.apache.streampipes.sdk.utils.Assets;
public class ${classNamePrefix}Controller extends FlinkDataSinkDeclarer<${classNamePrefix}Parameters> {
- private static final String EXAMPLE_KEY = "example-key";
+ private static final String HOST_KEY = "host-key";
+ private static final String PORT_KEY = "port-key";
+ private static final String PASSWORD_KEY = "password-key";
@Override
public DataSinkDescription declareModel() {
@@ -52,18 +52,20 @@ public class ${classNamePrefix}Controller extends FlinkDataSinkDeclarer<${classN
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
- .requiredTextParameter(Labels.withId(EXAMPLE_KEY))
+ .requiredTextParameter(Labels.withId(HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 1234)
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
.build();
}
@Override
public FlinkDataSinkRuntime<${classNamePrefix}Parameters> getRuntime(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
- String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
+ String host = extractor.singleValueParameter(HOST_KEY, String.class);
+ int port = extractor.singleValueParameter(PORT_KEY, Integer.class);
+ String password = extractor.secretValue(PASSWORD_KEY);
+ ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, host, port, password);
- ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleString);
-
- return new ${classNamePrefix}Program(params, Config.INSTANCE.getDebug());
+ return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug());
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
index 72e02e5..ff46465 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+
package ${package}.pe.sink.${packageName};
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -26,15 +26,27 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
public class ${classNamePrefix}Parameters extends EventSinkBindingParams {
- private String exampleText;
+ private String host;
+ private int port;
+ private String password;
- public ${classNamePrefix}Parameters(DataSinkInvocation graph, String exampleText) {
+ public ${classNamePrefix}Parameters(DataSinkInvocation graph, String host, int port, String password) {
super(graph);
- this.exampleText = exampleText;
+ this.host = host;
+ this.port = port;
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
}
- public String getExampleText() {
- return exampleText;
+ public String getPassword() {
+ return password;
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
index 90b77a5..3fedc44 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
@@ -34,9 +34,11 @@ public class ${classNamePrefix}Program extends FlinkDataSinkRuntime<${classNameP
implements Serializable {
private static final long serialVersionUID = 1L;
+ private final ${classNamePrefix}Parameters params;
public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) {
super(params, debug);
+ this.params = params;
}
@Override
@@ -50,5 +52,8 @@ implements Serializable {
DataStream<Event>... convertedStream) {
// TODO add logic here
+ // params.getHost();
+ // params.getPort();
+ // params.getPassword();
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
index d91bf67..2e51497 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
@@ -15,8 +15,9 @@
~ limitations under the License.
~
-->
+#set( $double_pound = '##' )
-## ${classNamePrefix}
+${double_pound} ${classNamePrefix}
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -24,19 +25,15 @@
***
-## Description
-
-Describe your new processor here!
+${double_pound} Description
+Describe your new data sink here!
***
-## Required input
-What are the input requirements of your processor?
+${double_pound} Required input
+What are the input requirements of your data sink?
***
-## Configuration
-What are the configurations a user has to provide
-
-## Output
-How do the events your processor emits look like.
+${double_pound} Configuration
+What are the configurations a user has to provide?
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png
index 5666a68..05d4b1d 100644
Binary files a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png and b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png differ
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
index e453cf8..054d02f 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
+++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
@@ -1,5 +1,11 @@
${package}.pe.sink.${packageName}.title=${classNamePrefix}
-${package}.pe.sink.${packageName}.description=Description of sink
+${package}.pe.sink.${packageName}.description=Description of data sink
-example-key.title=Example Text Parameter
-example-key.description=Example Text Parameter Description
+host-key.title=Host
+host-key.description=IP/DNS of service
+
+port-key.title=Port
+port-key.description=Port of service
+
+password-key.title=Password
+password-key.description=Password to connect
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
index a9ffe06..af80a73 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -36,7 +36,7 @@
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory></directory>
+ <directory/>
<includes>
<include>Dockerfile</include>
</includes>
@@ -44,21 +44,14 @@
<fileSet filtered="true" packaged="false" encoding="UTF-8">
<directory>src/main/resources</directory>
<includes>
- <!--<include>**/*.png</include>-->
<include>**/*.en</include>
<include>**/*.md</include>
</includes>
</fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
- <includes>
- <include>deployment/docker-compose.yml</include>
- </includes>
- </fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <fileSet filtered="false" packaged="false" >
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/system</include>
+ <include>**/*.png</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
@@ -76,4 +69,4 @@
<defaultValue>example</defaultValue>
</requiredProperty>
</requiredProperties>
-</archetype-descriptor>
+</archetype-descriptor>
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml
index 96c22e6..7c02150 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/pom.xml
@@ -24,7 +24,7 @@
<version>${version}</version>
<properties>
- <sp.version>0.67.0-SNAPSHOT</sp.version>
+ <sp.version>0.68.0-SNAPSHOT</sp.version>
</properties>
<dependencies>
@@ -97,7 +97,7 @@
<goal>repackage</goal>
</goals>
<configuration>
- <mainClass>${package}.main.Init</mainClass>
+ <mainClass>${package}.Init</mainClass>
</configuration>
</execution>
</executions>
diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
similarity index 77%
rename from archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
rename to archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
index d3d9c05..2f55329 100644
--- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -15,48 +15,39 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
-package ${package}.main;
+
+package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
-
-
-import ${package}.config.Config;
-import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
-
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
-import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
-import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
+import ${package}.config.Config;
+import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
public class Init extends StandaloneModelSubmitter {
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
DeclarersSingleton.getInstance()
.add(new ${classNamePrefix}Controller());
- DeclarersSingleton.getInstance().setPort(Config.INSTANCE.getPort());
- DeclarersSingleton.getInstance().setHostName(Config.INSTANCE.getHost());
-
- DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
+ DeclarersSingleton.getInstance().registerDataFormats(
+ new JsonDataFormatFactory(),
new CborDataFormatFactory(),
new SmileDataFormatFactory(),
new FstDataFormatFactory());
- DeclarersSingleton.getInstance().registerProtocols(new SpKafkaProtocolFactory(),
+ DeclarersSingleton.getInstance().registerProtocols(
+ new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory());
new Init().init(Config.INSTANCE);
-
}
-
-
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
index f1b4a9e..6ba303e 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -15,10 +15,11 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
+
package ${package}.config;
import org.apache.streampipes.config.SpConfig;
@@ -27,52 +28,28 @@ import org.apache.streampipes.container.model.PeConfig;
import static ${package}.config.ConfigKeys.*;
public enum Config implements PeConfig {
-
INSTANCE;
private SpConfig config;
-
- public final static String serverUrl;
- public final static String iconBaseUrl;
-
- private final static String SERVICE_ID= "pe/${package}";
+ private final static String SERVICE_ID= "pe/${package}.sink.jvm";
Config() {
- config = SpConfig.getSpConfig("pe/${package}");
-
- config.register(HOST, "${artifactId}", "Hostname for the pe sinks");
- config.register(PORT, 8090, "Port for the pe sinks");
- config.register(ICON_HOST, "backend", "Hostname for the icon host");
- config.register(ICON_PORT, 80, "Port for the icons in nginx");
-
- config.register(SERVICE_NAME, "${packageName}", "The name of the service");
- }
-
- static {
- serverUrl = Config.INSTANCE.getHost() + ":" + Config.INSTANCE.getPort();
- iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" + Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
- }
-
- public static final String getIconUrl(String pictureName) {
- return iconBaseUrl + "/" + pictureName + ".png";
+ config = SpConfig.getSpConfig(SERVICE_ID);
+ config.register(HOST, "${artifactId}", "Data sink host");
+ config.register(PORT, 8090, "Data sink port");
+ config.register(SERVICE_NAME, "${svc_name}", "Data sink service name");
}
+ @Override
public String getHost() {
return config.getString(HOST);
}
+ @Override
public int getPort() {
return config.getInteger(PORT);
}
- public String getIconHost() {
- return config.getString(ICON_HOST);
- }
-
- public int getIconPort() {
- return config.getInteger(ICON_PORT);
- }
-
@Override
public String getId() {
return SERVICE_ID;
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index 8e9477a..28d894c 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -24,7 +24,5 @@ package ${package}.config;
public class ConfigKeys {
final static String HOST = "SP_HOST";
final static String PORT = "SP_PORT";
- final static String ICON_HOST = "SP_ICON_HOST";
- final static String ICON_PORT = "SP_ICON_PORT";
final static String SERVICE_NAME = "SP_SERVICE_NAME";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__.java
index 095608f..33a7ff7 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__.java
@@ -15,37 +15,38 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
package ${package}.pe.sink.${packageName};
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-import org.slf4j.Logger;
-
-
public class ${classNamePrefix} implements EventSink<${classNamePrefix}Parameters> {
-private static Logger LOG;
+ private String host;
+ private int port;
+ private String password;
@Override
- public void onInvocation(${classNamePrefix}Parameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ public void onInvocation(${classNamePrefix}Parameters parameters, EventSinkRuntimeContext runtimeContext) {
+ this.host = parameters.getHost();
+ this.port = parameters.getPort();
+ this.password = parameters.getPassword();
+ // TODO init connection here to db here
}
@Override
- public void onEvent(Event event) throws SpRuntimeException {
-
+ public void onEvent(Event event) {
+ // TODO add logic here
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onDetach() {
+ // TODO close connection to db
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
index 9cc7dd1..2290eac 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+
package ${package}.pe.sink.${packageName};
import org.apache.streampipes.model.DataSinkType;
@@ -38,7 +38,9 @@ import org.apache.streampipes.sdk.utils.Assets;
public class ${classNamePrefix}Controller extends StandaloneEventSinkDeclarer<${classNamePrefix}Parameters> {
- private static final String EXAMPLE_KEY = "example-key";
+ private static final String HOST_KEY = "host-key";
+ private static final String PORT_KEY = "port-key";
+ private static final String PASSWORD_KEY = "password-key";
@Override
public DataSinkDescription declareModel() {
@@ -50,19 +52,20 @@ public class ${classNamePrefix}Controller extends StandaloneEventSinkDeclarer<${
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
- .requiredTextParameter(Labels.from(EXAMPLE_KEY, "Example Text Parameter", "Example " +
- "Text Parameter Description"))
+ .requiredTextParameter(Labels.withId(HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 1234)
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
.build();
}
@Override
public ConfiguredEventSink<${classNamePrefix}Parameters> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
- String exampleString = extractor.singleValueParameter(EXAMPLE_KEY, String.class);
-
- ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, exampleString);
+ String host = extractor.singleValueParameter(HOST_KEY, String.class);
+ int port = extractor.singleValueParameter(PORT_KEY, Integer.class);
+ String password = extractor.secretValue(PASSWORD_KEY);
+ ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, host, port, password);
return new ConfiguredEventSink<>(params, ${classNamePrefix}::new);
}
-
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
index 72e02e5..ff46465 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Parameters.java
@@ -15,10 +15,10 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+
package ${package}.pe.sink.${packageName};
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -26,15 +26,27 @@ import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
public class ${classNamePrefix}Parameters extends EventSinkBindingParams {
- private String exampleText;
+ private String host;
+ private int port;
+ private String password;
- public ${classNamePrefix}Parameters(DataSinkInvocation graph, String exampleText) {
+ public ${classNamePrefix}Parameters(DataSinkInvocation graph, String host, int port, String password) {
super(graph);
- this.exampleText = exampleText;
+ this.host = host;
+ this.port = port;
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
}
- public String getExampleText() {
- return exampleText;
+ public String getPassword() {
+ return password;
}
}
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
index d91bf67..2e51497 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/documentation.md
@@ -15,8 +15,9 @@
~ limitations under the License.
~
-->
+#set( $double_pound = '##' )
-## ${classNamePrefix}
+${double_pound} ${classNamePrefix}
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -24,19 +25,15 @@
***
-## Description
-
-Describe your new processor here!
+${double_pound} Description
+Describe your new data sink here!
***
-## Required input
-What are the input requirements of your processor?
+${double_pound} Required input
+What are the input requirements of your data sink?
***
-## Configuration
-What are the configurations a user has to provide
-
-## Output
-How do the events your processor emits look like.
+${double_pound} Configuration
+What are the configurations a user has to provide?
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png
index 5666a68..05d4b1d 100644
Binary files a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png and b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/icon.png differ
diff --git a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
index e453cf8..dc890af 100644
--- a/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
+++ b/archetypes/streampipes-archetype-pe-sinks-jvm/src/main/resources/archetype-resources/src/main/resources/__package__.pe.sink.__packageName__/strings.en
@@ -1,5 +1,11 @@
${package}.pe.sink.${packageName}.title=${classNamePrefix}
-${package}.pe.sink.${packageName}.description=Description of sink
+${package}.pe.sink.${packageName}.description=Description of data sink
-example-key.title=Example Text Parameter
-example-key.description=Example Text Parameter Description
+host-key.title=Host
+host-key.description=IP/DNS of service
+
+port-key.title=Port
+port-key.description=Port of service
+
+password-key.title=Password
+password-key.description=Password to connect
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/META-INF/maven/archetype-metadata.xml b/archetypes/streampipes-archetype-pe-sources/src/main/resources/META-INF/maven/archetype-metadata.xml
index 4777843..8b8a694 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -36,21 +36,22 @@
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory></directory>
+ <directory/>
<includes>
<include>Dockerfile</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/docker-compose.yml</include>
+ <include>**/*.en</include>
+ <include>**/*.md</include>
</includes>
</fileSet>
- <fileSet filtered="true" packaged="false" encoding="UTF-8">
- <directory/>
+ <fileSet filtered="false" packaged="false" >
+ <directory>src/main/resources</directory>
<includes>
- <include>deployment/system</include>
+ <include>**/*.png</include>
</includes>
</fileSet>
<fileSet filtered="true" packaged="false" encoding="UTF-8">
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/Dockerfile b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/Dockerfile
index 5bae36f..c339ac4 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/Dockerfile
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/Dockerfile
@@ -16,11 +16,8 @@
FROM adoptopenjdk/openjdk8-openj9:alpine-slim
ENV CONSUL_LOCATION consul
-
EXPOSE 8090
-RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
-
-COPY target/${artifactId}.jar /${artifactId}.jar
+COPY target/${artifactId}.jar /streampipes-processing-element-container.jar
-ENTRYPOINT ["java", "-jar", "/${artifactId}.jar"]
+ENTRYPOINT ["java", "-jar", "/streampipes-processing-element-container.jar"]
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/development/env b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/development/env
index 2eaca6f..684d7f8 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/development/env
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/development/env
@@ -17,4 +17,5 @@
SP_PORT=8005
SP_HOST=host.docker.internal
SP_DEBUG=true
-SP_FLINK_DEBUG=true
\ No newline at end of file
+SP_KAFKA_HOST=localhost
+SP_KAFKA_PORT=9094
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml
index b413045..1b070c7 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/pom.xml
@@ -25,7 +25,7 @@
<version>${version}</version>
<properties>
- <sp.version>0.67.0-SNAPSHOT</sp.version>
+ <sp.version>0.68.0-SNAPSHOT</sp.version>
</properties>
<dependencies>
@@ -103,7 +103,7 @@
<goal>repackage</goal>
</goals>
<configuration>
- <mainClass>${package}.main.Init</mainClass>
+ <mainClass>${package}.Init</mainClass>
</configuration>
</execution>
</executions>
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/main/Init.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/Init.java
similarity index 94%
rename from archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/main/Init.java
rename to archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/Init.java
index 5a8a4f0..face0d2 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/main/Init.java
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -19,12 +19,12 @@
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
-package ${package}.main;
+package ${package};
import org.apache.streampipes.container.init.DeclarersSingleton;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
import ${package}.config.Config;
-import ${package}.pe.${packageName}.DataSource;
+import ${package}.pe.source.${packageName}.DataSource;
public class Init extends StandaloneModelSubmitter {
@@ -33,6 +33,5 @@ public class Init extends StandaloneModelSubmitter {
.add(new DataSource());
new Init().init(Config.INSTANCE);
-
}
}
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/Config.java
index af2eb31..b35d331 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/Config.java
@@ -15,34 +15,37 @@
* limitations under the License.
*
*/
-
#set( $symbol_pound = '#' )
#set( $symbol_dollar = '$' )
#set( $symbol_escape = '\' )
+#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") )
+
package ${package}.config;
import org.apache.streampipes.config.SpConfig;
import org.apache.streampipes.container.model.PeConfig;
public enum Config implements PeConfig {
-
INSTANCE;
private SpConfig config;
- public static final String JAR_FILE = "./streampipes-processing-element-container.jar";
-
- private final static String SERVICE_ID = "pe/${package}";
+ private final static String SERVICE_ID = "pe/${package}.source";
Config() {
config = SpConfig.getSpConfig(SERVICE_ID);
+ config.register(ConfigKeys.HOST, "${artifactId}", "Data source host");
+ config.register(ConfigKeys.PORT, 8090, "Data source port");
+ config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data source service name");
+ config.register(ConfigKeys.KAFKA_HOST, "kafka", "Kafka host");
+ config.register(ConfigKeys.KAFKA_PORT, 9092, "Kafka port");
+ }
- config.register(ConfigKeys.HOST, "${artifactId}", "Hostname for the pe source component");
- config.register(ConfigKeys.PORT, 8090, "Port for the pe source component");
- config.register(ConfigKeys.ICON_HOST, "backend", "Hostname for the icon host");
- config.register(ConfigKeys.ICON_PORT, 80, "Port for the icons in nginx");
-
- config.register(ConfigKeys.SERVICE_NAME, "${packageName}", "The name of the service");
+ public String getKafkaHost() {
+ return config.getString(ConfigKeys.KAFKA_HOST);
+ }
+ public int getKafkaPort() {
+ return config.getInteger(ConfigKeys.KAFKA_PORT);
}
@Override
@@ -55,21 +58,6 @@ public enum Config implements PeConfig {
return config.getInteger(ConfigKeys.PORT);
}
- public static final String iconBaseUrl = "http://" + Config.INSTANCE.getIconHost() + ":" +
- Config.INSTANCE.getIconPort() + "/assets/img/pe_icons";
-
- public static final String getIconUrl(String pictureName) {
- return iconBaseUrl + "/" + pictureName + ".png";
- }
-
- public String getIconHost() {
- return config.getString(ConfigKeys.ICON_HOST);
- }
-
- public int getIconPort() {
- return config.getInteger(ConfigKeys.ICON_PORT);
- }
-
@Override
public String getId() {
return SERVICE_ID;
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index 8e9477a..5baebbd 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -24,7 +24,7 @@ package ${package}.config;
public class ConfigKeys {
final static String HOST = "SP_HOST";
final static String PORT = "SP_PORT";
- final static String ICON_HOST = "SP_ICON_HOST";
- final static String ICON_PORT = "SP_ICON_PORT";
final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ final static String KAFKA_HOST = "SP_KAFKA_HOST";
+ final static String KAFKA_PORT = "SP_KAFKA_PORT";
}
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/__classNamePrefix__Stream.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/__classNamePrefix__Stream.java
deleted file mode 100644
index 42aaa0c..0000000
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/__classNamePrefix__Stream.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#set($symbol_pound='#')
-#set($symbol_dollar='$')
-#set($symbol_escape='\' )
-
-package ${package}.pe.${packageName};
-
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataSourceDescription;
-import org.apache.streampipes.sdk.builder.DataStreamBuilder;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.Formats;
-import org.apache.streampipes.sdk.helpers.Protocols;
-import org.apache.streampipes.sources.AbstractAdapterIncludedStream;
-
-
-public class ${classNamePrefix}Stream extends AbstractAdapterIncludedStream {
-
- @Override
- public SpDataStream declareModel(DataSourceDescription sep) {
- return DataStreamBuilder.create("${package}-${packageName}", "${classNamePrefix}", "")
- .property(EpProperties.timestampProperty("timestamp"))
-
- // configure your stream here
-
- .format(Formats.jsonFormat())
- .protocol(Protocols.kafka("localhost", 9092, "TOPIC_SHOULD_BE_CHANGED"))
- .build();
- }
-
- @Override
- public void executeStream() {
-
- }
-}
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/DataSource.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/DataSource.java
similarity index 90%
rename from archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/DataSource.java
rename to archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/DataSource.java
index 83ae921..734b558 100644
--- a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/__packageName__/DataSource.java
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/DataSource.java
@@ -16,7 +16,7 @@
*
*/
-package ${package}.pe.${packageName};
+package ${package}.pe.source.${packageName};
import org.apache.streampipes.container.declarer.DataStreamDeclarer;
import org.apache.streampipes.container.declarer.SemanticEventProducerDeclarer;
@@ -30,7 +30,7 @@ import java.util.List;
public class DataSource implements SemanticEventProducerDeclarer {
public DataSourceDescription declareModel() {
- return DataSourceBuilder.create("${package}.${packageName}.source", "${classNamePrefix} " +
+ return DataSourceBuilder.create("${package}.pe.source.${packageName}", "${classNamePrefix} " +
"Source", "")
.build();
}
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/__classNamePrefix__Stream.java b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/__classNamePrefix__Stream.java
new file mode 100644
index 0000000..dc08881
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/java/pe/source.__packageName__/__classNamePrefix__Stream.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#set($symbol_pound='#')
+#set($symbol_dollar='$')
+#set($symbol_escape='\' )
+
+package ${package}.pe.source.${packageName};
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataSourceDescription;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataStreamBuilder;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.Formats;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Protocols;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.sources.AbstractAdapterIncludedStream;
+import org.apache.streampipes.vocabulary.SO;
+import ${package}.config.Config;
+
+public class ${classNamePrefix}Stream extends AbstractAdapterIncludedStream {
+
+ @Override
+ public SpDataStream declareModel(DataSourceDescription sep) {
+ return DataStreamBuilder.create("${package}.pe.source.${packageName}")
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .property(EpProperties.timestampProperty("timestamp"))
+
+ // TODO configure your stream here, see following as example
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.String, "sensorId")
+ .label("Sensor ID")
+ .description("The ID of the sensor")
+ .scope(PropertyScope.DIMENSION_PROPERTY)
+ .build())
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, "value")
+ .label("Value")
+ .description("Current value in the sensor")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+
+ .format(Formats.jsonFormat())
+ // TODO change kafka topic
+ .protocol(Protocols.kafka(Config.INSTANCE.getKafkaHost(), Config.INSTANCE.getKafkaPort(), "${package}.source.jvm"))
+ .build();
+ }
+
+ @Override
+ public void executeStream() {
+ // TODO add logic here
+ }
+}
diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/documentation.md
similarity index 77%
copy from archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
copy to archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/documentation.md
index d91bf67..4f09bac 100644
--- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/resources/__package__.pe.processor.__packageName__/documentation.md
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/documentation.md
@@ -15,8 +15,9 @@
~ limitations under the License.
~
-->
+#set( $double_pound = '##' )
-## ${classNamePrefix}
+${double_pound} ${classNamePrefix}
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -24,19 +25,10 @@
***
-## Description
-
-Describe your new processor here!
+${double_pound} Description
+Describe your new data source here!
***
-## Required input
-What are the input requirements of your processor?
-
-***
-
-## Configuration
-What are the configurations a user has to provide
-
-## Output
-How do the events your processor emits look like.
+${double_pound} Output
+How do the events your data source emits look like?
\ No newline at end of file
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/icon.png b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/icon.png
new file mode 100644
index 0000000..05d4b1d
Binary files /dev/null and b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/icon.png differ
diff --git a/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/strings.en b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/strings.en
new file mode 100644
index 0000000..acdd197
--- /dev/null
+++ b/archetypes/streampipes-archetype-pe-sources/src/main/resources/archetype-resources/src/main/resources/__package__.pe.source.__packageName__/strings.en
@@ -0,0 +1,2 @@
+${package}.pe.source.${packageName}.title=${classNamePrefix}
+${package}.pe.source.${packageName}.description=Description of data source