You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/05/26 16:10:39 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-144] Add Telegram publisher to send notifications
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 4d1e078 [STREAMPIPES-144] Add Telegram publisher to send notifications
new d37f242 Merge pull request #16 from grainier/STREAMPIPES-144
4d1e078 is described below
commit 4d1e078b8e4599c6099fe7a4ef4889023a4fd7f3
Author: Grainier <gr...@wso2.com>
AuthorDate: Tue May 26 18:34:08 2020 +0530
[STREAMPIPES-144] Add Telegram publisher to send notifications
---
streampipes-sinks-notifications-jvm/pom.xml | 4 +
.../jvm/SinksNotificationsJvmInit.java | 4 +-
.../jvm/telegram/TelegramController.java | 65 +++++++++++++++
.../jvm/telegram/TelegramParameters.java | 52 ++++++++++++
.../jvm/telegram/TelegramPublisher.java | 92 +++++++++++++++++++++
.../documentation.md | 62 ++++++++++++++
.../icon.png | Bin 0 -> 6844 bytes
.../strings.en | 11 +++
8 files changed, 289 insertions(+), 1 deletion(-)
diff --git a/streampipes-sinks-notifications-jvm/pom.xml b/streampipes-sinks-notifications-jvm/pom.xml
index 8abcc21..af23fc4 100644
--- a/streampipes-sinks-notifications-jvm/pom.xml
+++ b/streampipes-sinks-notifications-jvm/pom.xml
@@ -79,6 +79,10 @@
<groupId>javax.mail</groupId>
<artifactId>mailapi</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
index 1ea1c1f..0e07595 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.sinks.notifications.jvm.config.SinksNotificationsJ
import org.apache.streampipes.sinks.notifications.jvm.email.EmailController;
import org.apache.streampipes.sinks.notifications.jvm.onesignal.OneSignalController;
import org.apache.streampipes.sinks.notifications.jvm.slack.SlackNotificationController;
+import org.apache.streampipes.sinks.notifications.jvm.telegram.TelegramController;
public class SinksNotificationsJvmInit extends StandaloneModelSubmitter {
@@ -38,7 +39,8 @@ public class SinksNotificationsJvmInit extends StandaloneModelSubmitter {
.getInstance()
.add(new EmailController())
.add(new OneSignalController())
- .add(new SlackNotificationController());
+ .add(new SlackNotificationController())
+ .add(new TelegramController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramController.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramController.java
new file mode 100644
index 0000000..cf4a541
--- /dev/null
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramController.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.notifications.jvm.telegram;
+
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
+
+public class TelegramController extends StandaloneEventSinkDeclarer<TelegramParameters> {
+ private static final String CHANNEL_NAME = "channel-name";
+ private static final String MESSAGE_TEXT = "message-text";
+ private static final String BOT_API_KEY = "api-key";
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.telegram")
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .category(DataSinkType.NOTIFICATION)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredSecret(Labels.withId(BOT_API_KEY))
+ .requiredTextParameter(Labels.withId(CHANNEL_NAME))
+ .requiredTextParameter(Labels.withId(MESSAGE_TEXT), true, true)
+ .build();
+ }
+
+
+ @Override
+ public ConfiguredEventSink<TelegramParameters> onInvocation(DataSinkInvocation graph,
+ DataSinkParameterExtractor extractor) {
+ String apiKey = extractor.secretValue(BOT_API_KEY);
+ String channel = extractor.singleValueParameter(CHANNEL_NAME, String.class);
+ String message = extractor.singleValueParameter(MESSAGE_TEXT, String.class);
+ TelegramParameters params = new TelegramParameters(graph, apiKey, channel, message);
+ return new ConfiguredEventSink<>(params, TelegramPublisher::new);
+ }
+}
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramParameters.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramParameters.java
new file mode 100644
index 0000000..b792381
--- /dev/null
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramParameters.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.notifications.jvm.telegram;
+
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+
+public class TelegramParameters extends EventSinkBindingParams {
+ private String apiKey;
+ private String channel;
+ private String message;
+
+ public TelegramParameters(DataSinkInvocation graph,
+ String apiKey,
+ String channel,
+ String message) {
+ super(graph);
+ this.apiKey = apiKey;
+ this.channel = channel;
+ this.message = message;
+ }
+
+ public String getApiKey() {
+ return apiKey;
+ }
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+}
+
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramPublisher.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramPublisher.java
new file mode 100644
index 0000000..bdc6885
--- /dev/null
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/telegram/TelegramPublisher.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.notifications.jvm.telegram;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.runtime.EventSink;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+
+public class TelegramPublisher implements EventSink<TelegramParameters> {
+ private static final String ENDPOINT = "https://api.telegram.org/bot%s/sendMessage?chat_id=%s&text=%s";
+ private static final OkHttpClient HTTP_CLIENT = new OkHttpClient();
+ private static final String HASH_TAG = "#";
+ private String apiKey;
+ private String channel;
+ private String message;
+
+ @Override
+ public void onInvocation(TelegramParameters parameters,
+ EventSinkRuntimeContext runtimeContext) {
+ this.apiKey = parameters.getApiKey();
+ this.channel = parameters.getChannel();
+ this.message = parameters.getMessage();
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ try {
+ String content = replacePlaceholders(event, this.message);
+ content = trimHTML(content);
+ content = URLEncoder.encode(content, StandardCharsets.UTF_8.toString());
+ String url = String.format(ENDPOINT, this.apiKey, this.channel, content);
+ Request request = new Request.Builder().url(url).build();
+ try (Response response = HTTP_CLIENT.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ throw new SpRuntimeException("Could not send message. " + response);
+ }
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw new SpRuntimeException("Could not encode message.", e);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Could not send message.", e);
+ }
+ }
+
+ @Override
+ public void onDetach() {
+ // do nothing.
+ }
+
+ private String replacePlaceholders(Event event, String content) {
+ for (Map.Entry<String, Object> entry : event.getRaw().entrySet()) {
+ content = content.replaceAll(
+ HASH_TAG + entry.getKey() + HASH_TAG,
+ String.valueOf(entry.getValue())
+ );
+ }
+ return content;
+ }
+
+ private String trimHTML(String content) {
+ content = content.replaceAll("<div><!--block-->", "");
+ content = content.replaceAll(" </div>", "");
+ return content;
+ }
+}
diff --git a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/documentation.md b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/documentation.md
new file mode 100644
index 0000000..d33ebf4
--- /dev/null
+++ b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/documentation.md
@@ -0,0 +1,62 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Telegram Publisher
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Publisher to send notifications to a Telegram channel.
+
+In order to be able to do so, you will have first to:
+* Create a Telegram public [channel](https://telegram.org/tour/channels).
+* Create a Telegram BOT via [@BotFather](https://core.telegram.org/bots#3-how-do-i-create-a-bot) and get an API key.
+* Set the bot as [administrator](https://www.wikihow.com/Make-Someone-an-Admin-on-Telegram) in your channel.
+
+***
+
+## Required input
+
+This sink does not have any requirements and works with any incoming event type.
+
+***
+
+## Configuration
+
+Describe the configuration parameters here.
+
+### Bot API Key
+
+The API Key generated by `@BotFather` when you created your bot.
+
+### Channel Name
+
+The handle name of your channel (e.g. `@channel_name`).
+
+### Content
+
+The message to be sent.
+
+## Output
+
+(not applicable for data sinks)
diff --git a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/icon.png b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/icon.png
new file mode 100644
index 0000000..4c44ed6
Binary files /dev/null and b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/icon.png differ
diff --git a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/strings.en b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/strings.en
new file mode 100644
index 0000000..38faafe
--- /dev/null
+++ b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.telegram/strings.en
@@ -0,0 +1,11 @@
+org.apache.streampipes.sinks.notifications.jvm.telegram.title=Telegram Publisher
+org.apache.streampipes.sinks.notifications.jvm.telegram.description=Publisher to send notifications to a Telegram channel.
+
+api-key.title=Bot API Key
+api-key.description=The API Key generated by @BotFather when you created your bot.
+
+channel-name.title=Channel Name
+channel-name.description=The handle name of your channel (e.g. @channel_name).
+
+message-text.title=Message
+message-text.description=The message to be sent.