You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/16 12:52:08 UTC

[incubator-streampipes-extensions] branch dev updated: Modify NotificationProducer

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new f2d884b  Modify NotificationProducer
     new 2e19f81  Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
f2d884b is described below

commit f2d884b8884c77bc5bdaee76d70784811e3a4dd2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Mar 16 13:51:39 2020 +0100

    Modify NotificationProducer
---
 .../jvm/notification/NotificationProducer.java     | 25 ++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
index 9057a6d..77355dd 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
@@ -32,8 +32,12 @@ import java.util.Date;
 
 public class NotificationProducer implements EventSink<NotificationParameters> {
 
+  private static final String HASHTAG = "#";
+
   private String title;
   private String content;
+  private String correspondingPipelineId;
+  private String correspondingUser;
 
   private ActiveMQPublisher publisher;
   private Gson gson;
@@ -42,19 +46,25 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
   @Override
   public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext runtimeContext) throws
           SpRuntimeException {
-    this.publisher = new ActiveMQPublisher(SinksInternalJvmConfig.INSTANCE.getJmsHost() + ":" + SinksInternalJvmConfig.INSTANCE.getJmsPort(),
-            "org.apache.streampipes.notifications");
     this.gson = new Gson();
     this.title = parameters.getTitle();
     this.content = parameters.getContent();
+    this.correspondingPipelineId = parameters.getGraph().getCorrespondingPipeline();
+    this.correspondingUser = parameters.getGraph().getCorrespondingUser();
+    this.publisher = new ActiveMQPublisher(SinksInternalJvmConfig.INSTANCE.getJmsHost() + ":" + SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+            "org.apache.streampipes.notifications." + this.correspondingUser);
   }
 
   @Override
   public void onEvent(Event inputEvent) {
+    Date currentDate = new Date();
     Notification notification = new Notification();
     notification.setTitle(title);
-    notification.setMessage(content);
-    notification.setCreatedAt(new Date());
+    notification.setMessage(replacePlaceholders(inputEvent, content));
+    notification.setCreatedAt(currentDate);
+    notification.setCreatedAtTimestamp(currentDate.getTime());
+    notification.setCorrespondingPipelineId(correspondingPipelineId);
+    notification.setTargetedAt(correspondingUser);
 
     // TODO add targeted user to notification object
 
@@ -65,4 +75,11 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
   public void onDetach() throws SpRuntimeException {
     this.publisher.disconnect();
   }
+
+  private String replacePlaceholders(Event event, String content) {
+    for(String key: event.getRaw().keySet()) {
+      content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
+    }
+    return content;
+  }
 }