You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/05 11:23:30 UTC

[airflow] branch master updated: Add Telegram hook and operator (#11850)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new cd66450  Add Telegram hook and operator (#11850)
cd66450 is described below

commit cd66450b4ee2a219ddc847970255e420ed679700
Author: Shekhar Singh <sh...@gmail.com>
AuthorDate: Sat Dec 5 16:51:11 2020 +0530

    Add Telegram hook and operator (#11850)
    
    closes: #11845
    
    Adds:
    
    Telegram Hook
    Telegram Operator
---
 CONTRIBUTING.rst                                   |   4 +-
 INSTALL                                            |   4 +-
 airflow/providers/telegram/README.md               |  59 +++++++
 airflow/providers/telegram/__init__.py             |  17 ++
 .../providers/telegram/example_dags/__init__.py    |  17 ++
 .../telegram/example_dags/example_telegram.py      |  47 +++++
 airflow/providers/telegram/hooks/__init__.py       |  17 ++
 airflow/providers/telegram/hooks/telegram.py       | 152 ++++++++++++++++
 airflow/providers/telegram/operators/__init__.py   |  17 ++
 airflow/providers/telegram/operators/telegram.py   |  84 +++++++++
 airflow/providers/telegram/provider.yaml           |  42 +++++
 docs/apache-airflow-providers-telegram/index.rst   |  41 +++++
 .../operators.rst                                  |  59 +++++++
 docs/apache-airflow/extra-packages-ref.rst         | 112 ++++++------
 docs/conf.py                                       |   1 +
 setup.py                                           |   6 +
 tests/core/test_providers_manager.py               |   1 +
 tests/providers/telegram/__init__.py               |  17 ++
 tests/providers/telegram/hooks/__init__.py         |  17 ++
 tests/providers/telegram/hooks/test_telegram.py    | 192 +++++++++++++++++++++
 tests/providers/telegram/operators/__init__.py     |  17 ++
 .../providers/telegram/operators/test_telegram.py  | 149 ++++++++++++++++
 22 files changed, 1013 insertions(+), 59 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 5bddad0..95dd13e 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -553,8 +553,8 @@ gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hiv
 jenkins, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo,
 mssql, mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
 postgres, presto, qds, qubole, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, sftp,
-singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, vertica, virtualenv, webhdfs,
-winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
+singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
+webhdfs, winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
 
   .. END EXTRAS HERE
 
diff --git a/INSTALL b/INSTALL
index cb04ede..bf44ada 100644
--- a/INSTALL
+++ b/INSTALL
@@ -71,8 +71,8 @@ gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hiv
 jenkins, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo,
 mssql, mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
 postgres, presto, qds, qubole, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, sftp,
-singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, vertica, virtualenv, webhdfs,
-winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
+singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
+webhdfs, winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
 
 # END EXTRAS HERE
 
diff --git a/airflow/providers/telegram/README.md b/airflow/providers/telegram/README.md
new file mode 100644
index 0000000..f7a7ac8
--- /dev/null
+++ b/airflow/providers/telegram/README.md
@@ -0,0 +1,59 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+
+# Package apache-airflow-providers-telegram
+
+**Table of contents**
+
+- [Provider package](#provider-package)
+- [Installation](#installation)
+- [PIP requirements](#pip-requirements)
+- [Cross provider package dependencies](#cross-provider-package-dependencies)
+
+## Provider package
+
+This is a provider package for `telegram` provider. All classes for this provider package
+are in `airflow.providers.telegram` python package.
+
+## Installation
+
+You can install this package on top of an existing airflow 2.* installation via
+`pip install apache-airflow-providers-telegram`
+
+## PIP requirements
+
+| PIP package           | Version required   |
+|:----------------------|:-------------------|
+| python-telegram-bot   | 13.0               |
+
+## Cross provider package dependencies
+
+Those are dependencies that might be needed in order to use all the features of the package.
+You need to install the specified backport providers package in order to use them.
+
+You can install such cross-provider dependencies when installing from PyPI. For example:
+
+```bash
+pip install apache-airflow-providers-telegram[http]
+```
+
+| Dependent package                                                                       | Extra   |
+|:----------------------------------------------------------------------------------------|:--------|
+| [apache-airflow-providers-http](https://pypi.org/project/apache-airflow-providers-http) | http    |
diff --git a/airflow/providers/telegram/__init__.py b/airflow/providers/telegram/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/telegram/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/telegram/example_dags/__init__.py b/airflow/providers/telegram/example_dags/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/telegram/example_dags/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/telegram/example_dags/example_telegram.py b/airflow/providers/telegram/example_dags/example_telegram.py
new file mode 100644
index 0000000..6485e80
--- /dev/null
+++ b/airflow/providers/telegram/example_dags/example_telegram.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example use of Telegram operator.
+"""
+
+from airflow import DAG
+from airflow.providers.telegram.operators.telegram import TelegramOperator
+from airflow.utils.dates import days_ago
+
+default_args = {
+    'owner': 'airflow',
+}
+
+dag = DAG(
+    'example_telegram',
+    default_args=default_args,
+    start_date=days_ago(2),
+    tags=['example'],
+)
+
+# [START howto_operator_telegram]
+
+send_message_telegram_task = TelegramOperator(
+    task_id='send_message_telegram',
+    telegram_conn_id='telegram_conn_id',
+    chat_id='-3222103937',
+    text='Hello from Airflow!',
+    dag=dag,
+)
+
+# [END howto_operator_telegram]
diff --git a/airflow/providers/telegram/hooks/__init__.py b/airflow/providers/telegram/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/telegram/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/telegram/hooks/telegram.py b/airflow/providers/telegram/hooks/telegram.py
new file mode 100644
index 0000000..9501a4e
--- /dev/null
+++ b/airflow/providers/telegram/hooks/telegram.py
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hook for Telegram"""
+from typing import Optional
+
+import telegram
+import tenacity
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class TelegramHook(BaseHook):
+    """
+    This hook allows you to post messages to Telegram using the telegram python-telegram-bot library.
+
+    The library can be found here: https://github.com/python-telegram-bot/python-telegram-bot
+    It accepts both telegram bot API token directly or connection that has telegram bot API token.
+    If both supplied, token parameter will be given precedence, otherwise 'password' field in the connection
+    from telegram_conn_id will be used.
+    chat_id can also be provided in the connection using 'host' field in connection.
+    Following is the details of a telegram_connection:
+    name: 'telegram-connection-name'
+    conn_type: 'http'
+    password: 'TELEGRAM_TOKEN'
+    host: 'chat_id' (optional)
+    Examples:
+    .. code-block:: python
+
+        # Create hook
+        telegram_hook = TelegramHook(telegram_conn_id='telegram_default')
+        # or telegram_hook = TelegramHook(telegram_conn_id='telegram_default', chat_id='-1xxx')
+        # or telegram_hook = TelegramHook(token='xxx:xxx', chat_id='-1xxx')
+
+        # Call method from telegram bot client
+        telegram_hook.send_message(None', {"text": "message", "chat_id": "-1xxx"})
+        # or telegram_hook.send_message(None', {"text": "message"})
+
+    :param telegram_conn_id: connection that optionally has Telegram API token in the password field
+    :type telegram_conn_id: str
+    :param token: optional telegram API token
+    :type token: str
+    :param chat_id: optional chat_id of the telegram chat/channel/group
+    :type chat_id: str
+    """
+
+    def __init__(
+        self,
+        telegram_conn_id: Optional[str] = None,
+        token: Optional[str] = None,
+        chat_id: Optional[str] = None,
+    ) -> None:
+        super().__init__()
+        self.token = self.__get_token(token, telegram_conn_id)
+        self.chat_id = self.__get_chat_id(chat_id, telegram_conn_id)
+        self.connection = self.get_conn()
+
+    def get_conn(self) -> telegram.bot.Bot:
+        """
+        Returns the telegram bot client
+
+        :return: telegram bot client
+        :rtype: telegram.bot.Bot
+        """
+        return telegram.bot.Bot(token=self.token)
+
+    def __get_token(self, token: Optional[str], telegram_conn_id: str) -> str:
+        """
+        Returns the telegram API token
+
+        :param token: telegram API token
+        :type token: str
+        :param telegram_conn_id: telegram connection name
+        :type telegram_conn_id: str
+        :return: telegram API token
+        :rtype: str
+        """
+        if token is not None:
+            return token
+
+        if telegram_conn_id is not None:
+            conn = self.get_connection(telegram_conn_id)
+
+            if not conn.password:
+                raise AirflowException("Missing token(password) in Telegram connection")
+
+            return conn.password
+
+        raise AirflowException("Cannot get token: No valid Telegram connection supplied.")
+
+    def __get_chat_id(self, chat_id: Optional[str], telegram_conn_id: str) -> Optional[str]:
+        """
+        Returns the telegram chat ID for a chat/channel/group
+
+        :param chat_id: optional chat ID
+        :type chat_id: str
+        :param telegram_conn_id: telegram connection name
+        :type telegram_conn_id: str
+        :return: telegram chat ID
+        :rtype: str
+        """
+        if chat_id is not None:
+            return chat_id
+
+        if telegram_conn_id is not None:
+            conn = self.get_connection(telegram_conn_id)
+            return conn.host
+
+        return None
+
+    @tenacity.retry(
+        retry=tenacity.retry_if_exception_type(telegram.error.TelegramError),
+        stop=tenacity.stop_after_attempt(5),
+        wait=tenacity.wait_fixed(1),
+    )
+    def send_message(self, api_params: dict) -> None:
+        """
+        Sends the message to a telegram channel or chat.
+
+        :param api_params: params for telegram_instance.send_message. It can also be used to override chat_id
+        :type api_params: dict
+        """
+        kwargs = {
+            "chat_id": self.chat_id,
+            "parse_mode": telegram.parsemode.ParseMode.HTML,
+            "disable_web_page_preview": True,
+        }
+        kwargs.update(api_params)
+
+        if 'text' not in kwargs or kwargs['text'] is None:
+            raise AirflowException("'text' must be provided for telegram message")
+
+        if kwargs['chat_id'] is None:
+            raise AirflowException("'chat_id' must be provided for telegram message")
+
+        response = self.connection.send_message(**kwargs)
+        self.log.debug(response)
diff --git a/airflow/providers/telegram/operators/__init__.py b/airflow/providers/telegram/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/telegram/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/telegram/operators/telegram.py b/airflow/providers/telegram/operators/telegram.py
new file mode 100644
index 0000000..ae01be8
--- /dev/null
+++ b/airflow/providers/telegram/operators/telegram.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for Telegram"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.telegram.hooks.telegram import TelegramHook
+from airflow.utils.decorators import apply_defaults
+
+
+class TelegramOperator(BaseOperator):
+    """
+    This operator allows you to post messages to Telegram using Telegram Bot API.
+    Takes both Telegram Bot API token directly or connection that has Telegram token in password field.
+    If both supplied, token parameter will be given precedence.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:TelegramOperator`
+
+    :param telegram_conn_id: Telegram connection ID which its password is Telegram API token
+    :type telegram_conn_id: str
+    :param token: Telegram API Token
+    :type token: str
+    :param chat_id: Telegram chat ID for a chat/channel/group
+    :type chat_id: str
+    :param text: Message to be sent on telegram
+    :type text: str
+    :param telegram_kwargs: Extra args to be passed to telegram client
+    :type telegram_kwargs: dict
+    """
+
+    template_fields = ('text', 'chat_id')
+    ui_color = '#FFBA40'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        telegram_conn_id: str = "telegram_default",
+        token: Optional[str] = None,
+        chat_id: Optional[str] = None,
+        text: str = "No message has been set.",
+        telegram_kwargs: Optional[dict] = None,
+        **kwargs,
+    ):
+        self.chat_id = chat_id
+        self.token = token
+        self.telegram_kwargs = telegram_kwargs or {}
+
+        if text is not None:
+            self.telegram_kwargs['text'] = text
+
+        if telegram_conn_id is None:
+            raise AirflowException("No valid Telegram connection id supplied.")
+
+        self.telegram_conn_id = telegram_conn_id
+
+        super().__init__(**kwargs)
+
+    def execute(self, **kwargs) -> None:
+        """Calls the TelegramHook to post the provided Telegram message"""
+        telegram_hook = TelegramHook(
+            telegram_conn_id=self.telegram_conn_id,
+            token=self.token,
+            chat_id=self.chat_id,
+        )
+        telegram_hook.send_message(self.telegram_kwargs)
diff --git a/airflow/providers/telegram/provider.yaml b/airflow/providers/telegram/provider.yaml
new file mode 100644
index 0000000..9e2220b
--- /dev/null
+++ b/airflow/providers/telegram/provider.yaml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-telegram
+name: Telegram
+description: |
+    `Telegram <https://telegram.org/>`__
+
+versions:
+  - 0.0.1
+
+integrations:
+  - integration-name: Telegram
+    external-doc-url: https://telegram.org/
+    how-to-guide:
+      - /docs/apache-airflow-providers-telegram/operators.rst
+    tags: [service]
+
+operators:
+  - integration-name: Telegram
+    python-modules:
+      - airflow.providers.telegram.operators.telegram
+
+hooks:
+  - integration-name: Telegram
+    python-modules:
+      - airflow.providers.telegram.hooks.telegram
diff --git a/docs/apache-airflow-providers-telegram/index.rst b/docs/apache-airflow-providers-telegram/index.rst
new file mode 100644
index 0000000..226dc94
--- /dev/null
+++ b/docs/apache-airflow-providers-telegram/index.rst
@@ -0,0 +1,41 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-telegram``
+=====================================
+
+Content
+-------
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    Operators <operators>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: References
+
+    Python API <_api/airflow/providers/telegram/index>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/telegram/example_dags>
diff --git a/docs/apache-airflow-providers-telegram/operators.rst b/docs/apache-airflow-providers-telegram/operators.rst
new file mode 100644
index 0000000..7937a66
--- /dev/null
+++ b/docs/apache-airflow-providers-telegram/operators.rst
@@ -0,0 +1,59 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/operator:TelegramOperator:
+
+TelegramOperator
+================
+
+Use the :class:`~airflow.providers.telegram.operators.telegram.TelegramOperator`
+to send message to a `Telegram <https://telegram.org/>`__ group or a specific chat.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+Use the ``telegram_conn_id`` argument to connect to Telegram client where
+the connection metadata is structured as follows:
+
+.. list-table:: Telegram Connection Metadata
+   :widths: 25 25
+   :header-rows: 1
+
+   * - Parameter
+     - Input
+   * - Password: string
+     - Telegram Bot API Token
+   * - Host: string
+     - Chat ID of the Telegram group/chat
+   * - Connection Type: string
+     - http as connection type
+
+An example usage of the TelegramOperator is as follows:
+
+.. exampleinclude:: /../../airflow/providers/telegram/example_dags/example_telegram.py
+    :language: python
+    :start-after: [START howto_operator_telegram]
+    :end-before: [END howto_operator_telegram]
+
+.. note::
+
+  Parameters that can be passed onto the operator will be given priority over the parameters already given
+  in the Airflow connection metadata (such as ``Host``, ``Password`` and so forth).
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 9836f4c..c92f694 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -79,61 +79,63 @@ Here's the list of the :ref:`subpackages <installation:extra_packages>` and what
 
 **Services:**
 
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| subpackage          | install command                                     | enables                                                              |
-+=====================+=====================================================+======================================================================+
-| aws                 | ``pip install 'apache-airflow[amazon]'``            | Amazon Web Services                                                  |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| azure               | ``pip install 'apache-airflow[microsoft.azure]'``   | Microsoft Azure                                                      |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| cloudant            | ``pip install 'apache-airflow[cloudant]'``          | Cloudant hook                                                        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| databricks          | ``pip install 'apache-airflow[databricks]'``        | Databricks hooks and operators                                       |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| datadog             | ``pip install 'apache-airflow[datadog]'``           | Datadog hooks and sensors                                            |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| dingding            | ``pip install 'apache-airflow[dingding]'``          | Dingding hooks and sensors                                           |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| discord             | ``pip install 'apache-airflow[discord]'``           | Discord hooks and sensors                                            |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| facebook            | ``pip install 'apache-airflow[facebook]'``          | Facebook Social                                                      |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| gcp                 | ``pip install 'apache-airflow[google]'``            | Google Cloud                                                         |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| github_enterprise   | ``pip install 'apache-airflow[github_enterprise]'`` | GitHub Enterprise auth backend                                       |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| google_auth         | ``pip install 'apache-airflow[google_auth]'``       | Google auth backend                                                  |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| hashicorp           | ``pip install 'apache-airflow[hashicorp]'``         | Hashicorp Services (Vault)                                           |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| jira                | ``pip install 'apache-airflow[jira]'``              | Jira hooks and operators                                             |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| opsgenie            | ``pip install 'apache-airflow[opsgenie]'``          | OpsGenie hooks and operators                                         |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| pagerduty           | ``pip install 'apache-airflow[pagerduty]'``         | Pagerduty hook                                                       |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| plexus              | ``pip install 'apache-airflow[plexus]'``            | Plexus service of CoreScientific.com AI platform                     |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| qds                 | ``pip install 'apache-airflow[qds]'``               | Enable QDS (Qubole Data Service) support                             |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| salesforce          | ``pip install 'apache-airflow[salesforce]'``        | Salesforce hook                                                      |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| sendgrid            | ``pip install 'apache-airflow[sendgrid]'``          | Send email using sendgrid                                            |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| segment             | ``pip install 'apache-airflow[segment]'``           | Segment hooks and sensors                                            |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| sentry              | ``pip install 'apache-airflow[sentry]'``            | Sentry service for application logging and monitoring                |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| slack               | ``pip install 'apache-airflow[slack]'``             | :class:`airflow.providers.slack.operators.slack.SlackAPIOperator`    |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| snowflake           | ``pip install 'apache-airflow[snowflake]'``         | Snowflake hooks and operators                                        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| vertica             | ``pip install 'apache-airflow[vertica]'``           | Vertica hook support as an Airflow backend                           |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| yandexcloud         | ``pip install 'apache-airflow[yandexcloud]'``       | Yandex.Cloud hooks and operators                                     |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
-| zendesk             | ``pip install 'apache-airflow[zendesk]'``           | Zendesk hooks                                                        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| subpackage          | install command                                     | enables                                                                    |
++=====================+=====================================================+============================================================================+
+| aws                 | ``pip install 'apache-airflow[amazon]'``            | Amazon Web Services                                                        |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| azure               | ``pip install 'apache-airflow[microsoft.azure]'``   | Microsoft Azure                                                            |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| cloudant            | ``pip install 'apache-airflow[cloudant]'``          | Cloudant hook                                                              |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| databricks          | ``pip install 'apache-airflow[databricks]'``        | Databricks hooks and operators                                             |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| datadog             | ``pip install 'apache-airflow[datadog]'``           | Datadog hooks and sensors                                                  |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| dingding            | ``pip install 'apache-airflow[dingding]'``          | Dingding hooks and sensors                                                 |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| discord             | ``pip install 'apache-airflow[discord]'``           | Discord hooks and sensors                                                  |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| facebook            | ``pip install 'apache-airflow[facebook]'``          | Facebook Social                                                            |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| gcp                 | ``pip install 'apache-airflow[google]'``            | Google Cloud                                                               |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| github_enterprise   | ``pip install 'apache-airflow[github_enterprise]'`` | GitHub Enterprise auth backend                                             |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| google_auth         | ``pip install 'apache-airflow[google_auth]'``       | Google auth backend                                                        |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| hashicorp           | ``pip install 'apache-airflow[hashicorp]'``         | Hashicorp Services (Vault)                                                 |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| jira                | ``pip install 'apache-airflow[jira]'``              | Jira hooks and operators                                                   |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| opsgenie            | ``pip install 'apache-airflow[opsgenie]'``          | OpsGenie hooks and operators                                               |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| pagerduty           | ``pip install 'apache-airflow[pagerduty]'``         | Pagerduty hook                                                             |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| plexus              | ``pip install 'apache-airflow[plexus]'``            | Plexus service of CoreScientific.com AI platform                           |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| qds                 | ``pip install 'apache-airflow[qds]'``               | Enable QDS (Qubole Data Service) support                                   |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| salesforce          | ``pip install 'apache-airflow[salesforce]'``        | Salesforce hook                                                            |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| sendgrid            | ``pip install 'apache-airflow[sendgrid]'``          | Send email using sendgrid                                                  |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| segment             | ``pip install 'apache-airflow[segment]'``           | Segment hooks and sensors                                                  |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| sentry              | ``pip install 'apache-airflow[sentry]'``            | Sentry service for application logging and monitoring                      |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| slack               | ``pip install 'apache-airflow[slack]'``             | :class:`airflow.providers.slack.operators.slack.SlackAPIOperator`          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| snowflake           | ``pip install 'apache-airflow[snowflake]'``         | Snowflake hooks and operators                                              |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| telegram            | ``pip install 'apache-airflow[telegram]'``          | :class:`airflow.providers.telegram.operators.telegram.TelegramOperator`    |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| vertica             | ``pip install 'apache-airflow[vertica]'``           | Vertica hook support as an Airflow backend                                 |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| yandexcloud         | ``pip install 'apache-airflow[yandexcloud]'``       | Yandex.Cloud hooks and operators                                           |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
+| zendesk             | ``pip install 'apache-airflow[zendesk]'``           | Zendesk hooks                                                              |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
 
 
 **Software:**
diff --git a/docs/conf.py b/docs/conf.py
index bfd04b5..86fc2cf 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -353,6 +353,7 @@ autodoc_mock_imports = [
     'smbclient',
     'snowflake',
     'sshtunnel',
+    'telegram',
     'tenacity',
     'vertica_python',
     'winrm',
diff --git a/setup.py b/setup.py
index 5404b82..b5aa93b 100644
--- a/setup.py
+++ b/setup.py
@@ -412,6 +412,9 @@ statsd = [
 tableau = [
     'tableauserverclient~=0.12',
 ]
+telegram = [
+    'python-telegram-bot==13.0',
+]
 vertica = [
     'vertica-python>=0.5.1',
 ]
@@ -577,6 +580,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, Iterable[str]] = {
     "snowflake": snowflake,
     "sqlite": [],
     "ssh": ssh,
+    "telegram": telegram,
     "vertica": vertica,
     "yandex": yandexcloud,
     "zendesk": zendesk,
@@ -670,6 +674,7 @@ EXTRAS_REQUIREMENTS: Dict[str, List[str]] = {
     'ssh': ssh,
     'statsd': statsd,
     'tableau': tableau,
+    'telegram': telegram,
     'vertica': vertica,
     'virtualenv': virtualenv,
     'webhdfs': webhdfs,  # TODO: remove this in Airflow 2.1
@@ -789,6 +794,7 @@ EXTRAS_PROVIDERS_PACKAGES: Dict[str, Iterable[str]] = {
     'ssh': ["ssh"],
     'statsd': [],
     'tableau': [],
+    'telegram': ["telegram"],
     'vertica': ["vertica"],
     'virtualenv': [],
     'webhdfs': ["apache.hdfs"],  # TODO: remove this in Airflow 2.1
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index 69e2027..313e5d8 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -77,6 +77,7 @@ ALL_PROVIDERS = [
     'apache-airflow-providers-snowflake',
     'apache-airflow-providers-sqlite',
     'apache-airflow-providers-ssh',
+    'apache-airflow-providers-telegram',
     'apache-airflow-providers-vertica',
     'apache-airflow-providers-yandex',
     'apache-airflow-providers-zendesk',
diff --git a/tests/providers/telegram/__init__.py b/tests/providers/telegram/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/telegram/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/telegram/hooks/__init__.py b/tests/providers/telegram/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/telegram/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/telegram/hooks/test_telegram.py b/tests/providers/telegram/hooks/test_telegram.py
new file mode 100644
index 0000000..1215be4
--- /dev/null
+++ b/tests/providers/telegram/hooks/test_telegram.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import telegram
+
+import airflow
+from airflow.models import Connection
+from airflow.providers.telegram.hooks.telegram import TelegramHook
+from airflow.utils import db
+
+TELEGRAM_TOKEN = "dummy token"
+
+
+class TestTelegramHook(unittest.TestCase):
+    def setUp(self):
+        db.merge_conn(
+            Connection(
+                conn_id='telegram-webhook-without-token',
+                conn_type='http',
+            )
+        )
+        db.merge_conn(
+            Connection(
+                conn_id='telegram_default',
+                conn_type='http',
+                password=TELEGRAM_TOKEN,
+            )
+        )
+        db.merge_conn(
+            Connection(
+                conn_id='telegram-webhook-with-chat_id',
+                conn_type='http',
+                password=TELEGRAM_TOKEN,
+                host="-420913222",
+            )
+        )
+
+    def test_should_raise_exception_if_both_connection_or_token_is_not_provided(self):
+        with self.assertRaises(airflow.exceptions.AirflowException) as e:
+            TelegramHook()
+
+        self.assertEqual("Cannot get token: No valid Telegram connection supplied.", str(e.exception))
+
+    def test_should_raise_exception_if_conn_id_doesnt_exist(self):
+        with self.assertRaises(airflow.exceptions.AirflowNotFoundException) as e:
+            TelegramHook(telegram_conn_id='telegram-webhook-non-existent')
+
+        self.assertEqual("The conn_id `telegram-webhook-non-existent` isn't defined", str(e.exception))
+
+    def test_should_raise_exception_if_conn_id_doesnt_contain_token(self):
+        with self.assertRaises(airflow.exceptions.AirflowException) as e:
+            TelegramHook(telegram_conn_id='telegram-webhook-without-token')
+
+        self.assertEqual("Missing token(password) in Telegram connection", str(e.exception))
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_raise_exception_if_chat_id_is_not_provided_anywhere(self, mock_get_conn):
+        with self.assertRaises(airflow.exceptions.AirflowException) as e:
+            hook = TelegramHook(telegram_conn_id='telegram_default')
+            hook.send_message({"text": "test telegram message"})
+
+        self.assertEqual("'chat_id' must be provided for telegram message", str(e.exception))
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_raise_exception_if_message_text_is_not_provided(self, mock_get_conn):
+        with self.assertRaises(airflow.exceptions.AirflowException) as e:
+            hook = TelegramHook(telegram_conn_id='telegram_default')
+            hook.send_message({"chat_id": -420913222})
+
+        self.assertEqual("'text' must be provided for telegram message", str(e.exception))
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_send_message_if_all_parameters_are_correctly_provided(self, mock_get_conn):
+        mock_get_conn.return_value = mock.Mock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id='telegram_default')
+        hook.send_message({"chat_id": -420913222, "text": "test telegram message"})
+
+        mock_get_conn.return_value.send_message.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_message.assert_called_once_with(
+            **{
+                'chat_id': -420913222,
+                'parse_mode': 'HTML',
+                'disable_web_page_preview': True,
+                'text': 'test telegram message',
+            }
+        )
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_send_message_if_chat_id_is_provided_through_constructor(self, mock_get_conn):
+        mock_get_conn.return_value = mock.Mock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id='telegram_default', chat_id=-420913222)
+        hook.send_message({"text": "test telegram message"})
+
+        mock_get_conn.return_value.send_message.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_message.assert_called_once_with(
+            **{
+                'chat_id': -420913222,
+                'parse_mode': 'HTML',
+                'disable_web_page_preview': True,
+                'text': 'test telegram message',
+            }
+        )
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_send_message_if_chat_id_is_provided_in_connection(self, mock_get_conn):
+        mock_get_conn.return_value = mock.Mock(password="some_token")
+
+        hook = TelegramHook(telegram_conn_id='telegram-webhook-with-chat_id')
+        hook.send_message({"text": "test telegram message"})
+
+        mock_get_conn.return_value.send_message.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_message.assert_called_once_with(
+            **{
+                'chat_id': "-420913222",
+                'parse_mode': 'HTML',
+                'disable_web_page_preview': True,
+                'text': 'test telegram message',
+            }
+        )
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_retry_when_any_telegram_error_is_encountered(self, mock_get_conn):
+        excepted_retry_count = 5
+        mock_get_conn.return_value = mock.Mock(password="some_token")
+
+        def side_effect(*args, **kwargs):
+            raise telegram.error.TelegramError("cosmic rays caused bit flips")
+
+        mock_get_conn.return_value.send_message.side_effect = side_effect
+
+        with self.assertRaises(Exception) as e:
+            hook = TelegramHook(telegram_conn_id='telegram-webhook-with-chat_id')
+            hook.send_message({"text": "test telegram message"})
+
+        self.assertTrue("RetryError" in str(e.exception))
+        self.assertTrue("state=finished raised TelegramError" in str(e.exception))
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_message.assert_called_with(
+            **{
+                'chat_id': "-420913222",
+                'parse_mode': 'HTML',
+                'disable_web_page_preview': True,
+                'text': 'test telegram message',
+            }
+        )
+        self.assertEqual(excepted_retry_count, mock_get_conn.return_value.send_message.call_count)
+
+    @mock.patch('airflow.providers.telegram.hooks.telegram.TelegramHook.get_conn')
+    def test_should_send_message_if_token_is_provided(self, mock_get_conn):
+        mock_get_conn.return_value = mock.Mock(password="some_token")
+
+        hook = TelegramHook(token=TELEGRAM_TOKEN, chat_id=-420913222)
+        hook.send_message({"text": "test telegram message"})
+
+        mock_get_conn.return_value.send_message.return_value = "OK."
+
+        mock_get_conn.assert_called_once()
+        mock_get_conn.return_value.send_message.assert_called_once_with(
+            **{
+                'chat_id': -420913222,
+                'parse_mode': 'HTML',
+                'disable_web_page_preview': True,
+                'text': 'test telegram message',
+            }
+        )
diff --git a/tests/providers/telegram/operators/__init__.py b/tests/providers/telegram/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/telegram/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/telegram/operators/test_telegram.py b/tests/providers/telegram/operators/test_telegram.py
new file mode 100644
index 0000000..b1629f3
--- /dev/null
+++ b/tests/providers/telegram/operators/test_telegram.py
@@ -0,0 +1,149 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+import telegram
+
+import airflow
+from airflow.models import Connection
+from airflow.providers.telegram.operators.telegram import TelegramOperator
+from airflow.utils import db
+
+TELEGRAM_TOKEN = "xxx:xxx"
+
+
+class TestTelegramOperator(unittest.TestCase):
+    def setUp(self):
+        db.merge_conn(
+            Connection(
+                conn_id='telegram_default',
+                conn_type='http',
+                password=TELEGRAM_TOKEN,
+            )
+        )
+        db.merge_conn(
+            Connection(
+                conn_id='telegram_default-with-chat-id',
+                conn_type='http',
+                password=TELEGRAM_TOKEN,
+                host="-420913222",
+            )
+        )
+
+    @mock.patch('airflow.providers.telegram.operators.telegram.TelegramHook')
+    def test_should_send_message_when_all_parameters_are_provided(self, mock_telegram_hook):
+        mock_telegram_hook.return_value = mock.Mock()
+        mock_telegram_hook.return_value.send_message.return_value = True
+
+        hook = TelegramOperator(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            task_id='telegram',
+            text="some non empty text",
+        )
+        hook.execute()
+
+        mock_telegram_hook.assert_called_once_with(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            token=None,
+        )
+        mock_telegram_hook.return_value.send_message.assert_called_once_with(
+            {'text': 'some non empty text'},
+        )
+
+    def test_should_throw_exception_if_connection_id_is_none(self):
+        with self.assertRaises(airflow.exceptions.AirflowException) as e:
+            TelegramOperator(task_id="telegram", telegram_conn_id=None)
+
+        self.assertEqual("No valid Telegram connection id supplied.", str(e.exception))
+
+    @mock.patch('airflow.providers.telegram.operators.telegram.TelegramHook')
+    def test_should_throw_exception_if_telegram_hook_throws_any_exception(self, mock_telegram_hook):
+        def side_effect(*args, **kwargs):
+            raise telegram.error.TelegramError("cosmic rays caused bit flips")
+
+        mock_telegram_hook.return_value = mock.Mock()
+        mock_telegram_hook.return_value.send_message.side_effect = side_effect
+
+        with self.assertRaises(telegram.error.TelegramError) as e:
+            hook = TelegramOperator(
+                telegram_conn_id='telegram_default',
+                task_id='telegram',
+                text="some non empty text",
+            )
+            hook.execute()
+
+        self.assertEqual("cosmic rays caused bit flips", str(e.exception))
+
+    @mock.patch('airflow.providers.telegram.operators.telegram.TelegramHook')
+    def test_should_forward_all_args_to_telegram(self, mock_telegram_hook):
+        mock_telegram_hook.return_value = mock.Mock()
+        mock_telegram_hook.return_value.send_message.return_value = True
+
+        hook = TelegramOperator(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            task_id='telegram',
+            text="some non empty text",
+            telegram_kwargs={"custom_arg": "value"},
+        )
+        hook.execute()
+
+        mock_telegram_hook.assert_called_once_with(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            token=None,
+        )
+        mock_telegram_hook.return_value.send_message.assert_called_once_with(
+            {'custom_arg': 'value', 'text': 'some non empty text'},
+        )
+
+    @mock.patch('airflow.providers.telegram.operators.telegram.TelegramHook')
+    def test_should_give_precedence_to_text_passed_in_constructor(self, mock_telegram_hook):
+        mock_telegram_hook.return_value = mock.Mock()
+        mock_telegram_hook.return_value.send_message.return_value = True
+
+        hook = TelegramOperator(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            task_id='telegram',
+            text="some non empty text - higher precedence",
+            telegram_kwargs={"custom_arg": "value", "text": "some text, that will be ignored"},
+        )
+        hook.execute()
+
+        mock_telegram_hook.assert_called_once_with(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            token=None,
+        )
+        mock_telegram_hook.return_value.send_message.assert_called_once_with(
+            {'custom_arg': 'value', 'text': 'some non empty text - higher precedence'},
+        )
+
+    def test_should_return_template_fields(self):
+        hook = TelegramOperator(
+            telegram_conn_id='telegram_default',
+            chat_id='-420913222',
+            task_id='telegram',
+            text="some non empty text - higher precedence",
+            telegram_kwargs={"custom_arg": "value", "text": "some text, that will be ignored"},
+        )
+        self.assertEqual(('text', 'chat_id'), hook.template_fields)