You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/16 12:52:08 UTC
[incubator-streampipes-extensions] branch dev updated: Modify
NotificationProducer
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new f2d884b Modify NotificationProducer
new 2e19f81 Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
f2d884b is described below
commit f2d884b8884c77bc5bdaee76d70784811e3a4dd2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Mar 16 13:51:39 2020 +0100
Modify NotificationProducer
---
.../jvm/notification/NotificationProducer.java | 25 ++++++++++++++++++----
1 file changed, 21 insertions(+), 4 deletions(-)
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
index 9057a6d..77355dd 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
@@ -32,8 +32,12 @@ import java.util.Date;
public class NotificationProducer implements EventSink<NotificationParameters> {
+ private static final String HASHTAG = "#";
+
private String title;
private String content;
+ private String correspondingPipelineId;
+ private String correspondingUser;
private ActiveMQPublisher publisher;
private Gson gson;
@@ -42,19 +46,25 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
@Override
public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.publisher = new ActiveMQPublisher(SinksInternalJvmConfig.INSTANCE.getJmsHost() + ":" + SinksInternalJvmConfig.INSTANCE.getJmsPort(),
- "org.apache.streampipes.notifications");
this.gson = new Gson();
this.title = parameters.getTitle();
this.content = parameters.getContent();
+ this.correspondingPipelineId = parameters.getGraph().getCorrespondingPipeline();
+ this.correspondingUser = parameters.getGraph().getCorrespondingUser();
+ this.publisher = new ActiveMQPublisher(SinksInternalJvmConfig.INSTANCE.getJmsHost() + ":" + SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+ "org.apache.streampipes.notifications." + this.correspondingUser);
}
@Override
public void onEvent(Event inputEvent) {
+ Date currentDate = new Date();
Notification notification = new Notification();
notification.setTitle(title);
- notification.setMessage(content);
- notification.setCreatedAt(new Date());
+ notification.setMessage(replacePlaceholders(inputEvent, content));
+ notification.setCreatedAt(currentDate);
+ notification.setCreatedAtTimestamp(currentDate.getTime());
+ notification.setCorrespondingPipelineId(correspondingPipelineId);
+ notification.setTargetedAt(correspondingUser);
// TODO add targeted user to notification object
@@ -65,4 +75,11 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
public void onDetach() throws SpRuntimeException {
this.publisher.disconnect();
}
+
+ private String replacePlaceholders(Event event, String content) {
+ for(String key: event.getRaw().keySet()) {
+ content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
+ }
+ return content;
+ }
}