You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/05/17 18:25:12 UTC

[airflow] branch master updated: Add Asana Provider (#14521)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 162e320  Add Asana Provider (#14521)
162e320 is described below

commit 162e3204c000ca2ebdc42b9b90a1873d4362ed6e
Author: Jennifer Melot <jt...@gmail.com>
AuthorDate: Mon May 17 14:24:59 2021 -0400

    Add Asana Provider (#14521)
---
 CONTRIBUTING.rst                                   |   2 +-
 INSTALL                                            |   2 +-
 airflow/providers/asana/CHANGELOG.rst              |  24 ++
 airflow/providers/asana/README.md                  |  18 ++
 airflow/providers/asana/__init__.py                |  17 ++
 airflow/providers/asana/example_dags/__init__.py   |  17 ++
 .../providers/asana/example_dags/example_asana.py  |  77 ++++++
 airflow/providers/asana/hooks/__init__.py          |  17 ++
 airflow/providers/asana/hooks/asana.py             | 282 +++++++++++++++++++++
 airflow/providers/asana/operators/__init__.py      |  17 ++
 airflow/providers/asana/operators/asana_tasks.py   | 170 +++++++++++++
 airflow/providers/asana/provider.yaml              |  44 ++++
 dev/provider_packages/prepare_provider_packages.py |   2 +-
 docs/apache-airflow-providers-asana/commits.rst    |  27 ++
 .../connections/asana.rst                          |  31 +++
 docs/apache-airflow-providers-asana/index.rst      | 123 +++++++++
 .../operators/asana.rst                            |  88 +++++++
 docs/apache-airflow/extra-packages-ref.rst         |   2 +
 docs/spelling_wordlist.txt                         |   3 +-
 .../run_install_and_test_provider_packages.sh      |   8 +-
 setup.py                                           |   2 +
 tests/core/test_providers_manager.py               |   5 +
 tests/providers/asana/__init__.py                  |  17 ++
 tests/providers/asana/hooks/__init__.py            |  17 ++
 tests/providers/asana/hooks/test_asana.py          | 254 +++++++++++++++++++
 tests/providers/asana/operators/__init__.py        |  17 ++
 .../providers/asana/operators/test_asana_tasks.py  | 102 ++++++++
 27 files changed, 1377 insertions(+), 8 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index e8418d8..8328f57 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -579,7 +579,7 @@ This is the full list of those extras:
 
 airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
 apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
 cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
 devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
 gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
diff --git a/INSTALL b/INSTALL
index 1891589..c488f73 100644
--- a/INSTALL
+++ b/INSTALL
@@ -91,7 +91,7 @@ The list of available extras:
 
 airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
 apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
 cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
 devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
 gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
diff --git a/airflow/providers/asana/CHANGELOG.rst b/airflow/providers/asana/CHANGELOG.rst
new file mode 100644
index 0000000..a152fd0
--- /dev/null
+++ b/airflow/providers/asana/CHANGELOG.rst
@@ -0,0 +1,24 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/asana/README.md b/airflow/providers/asana/README.md
new file mode 100644
index 0000000..ef14aff
--- /dev/null
+++ b/airflow/providers/asana/README.md
@@ -0,0 +1,18 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
diff --git a/airflow/providers/asana/__init__.py b/airflow/providers/asana/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/asana/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/asana/example_dags/__init__.py b/airflow/providers/asana/example_dags/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/asana/example_dags/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/asana/example_dags/example_asana.py b/airflow/providers/asana/example_dags/example_asana.py
new file mode 100644
index 0000000..8034bbe
--- /dev/null
+++ b/airflow/providers/asana/example_dags/example_asana.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example showing how to use Asana CreateTaskOperator.
+"""
+
+from airflow import DAG
+from airflow.providers.asana.operators.asana_tasks import (
+    AsanaCreateTaskOperator,
+    AsanaDeleteTaskOperator,
+    AsanaFindTaskOperator,
+    AsanaUpdateTaskOperator,
+)
+from airflow.utils.dates import days_ago
+
+default_args = {
+    "owner": "airflow",
+}
+
+
+with DAG(
+    "example_asana",
+    default_args=default_args,
+    start_date=days_ago(1),
+    tags=["example"],
+) as dag:
+    conn_id = "asana_test"
+
+    # [START run_asana_create_task_operator]
+    create = AsanaCreateTaskOperator(
+        task_id="run_asana_create_task",
+        task_parameters={"projects": "your_project"},
+        conn_id=conn_id,
+        name="Test Task Create",
+    )
+    # [END run_asana_create_task_operator]
+
+    # [START run_asana_find_task_operator]
+    find = AsanaFindTaskOperator(
+        task_id="run_asana_find_task",
+        search_parameters={"project": "your_project"},
+        conn_id=conn_id,
+    )
+    # [END run_asana_find_task_operator]
+
+    # [START run_asana_update_task_operator]
+    update = AsanaUpdateTaskOperator(
+        task_id="run_asana_update_task",
+        asana_task_gid="your_task_id",
+        task_parameters={"notes": "This task was updated!", "completed": True},
+        conn_id=conn_id,
+    )
+    # [END run_asana_update_task_operator]
+
+    # [START run_asana_delete_task_operator]
+    delete = AsanaDeleteTaskOperator(
+        task_id="run_asana_delete_task",
+        conn_id=conn_id,
+        asana_task_gid="your_task_id",
+    )
+    # [END run_asana_delete_task_operator]
+
+    create >> find >> update >> delete
diff --git a/airflow/providers/asana/hooks/__init__.py b/airflow/providers/asana/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/asana/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/asana/hooks/asana.py b/airflow/providers/asana/hooks/asana.py
new file mode 100644
index 0000000..ca8c4c3
--- /dev/null
+++ b/airflow/providers/asana/hooks/asana.py
@@ -0,0 +1,282 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Connect to Asana."""
+from typing import Any, Dict
+
+from asana import Client
+from asana.error import NotFoundError
+from cached_property import cached_property
+
+from airflow.hooks.base import BaseHook
+
+
+class AsanaHook(BaseHook):
+    """Wrapper around Asana Python client library."""
+
+    conn_name_attr = "asana_conn_id"
+    default_conn_name = "asana_default"
+    conn_type = "asana"
+    hook_name = "Asana"
+
+    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.connection = self.get_connection(conn_id)
+        extras = self.connection.extra_dejson
+        self.workspace = extras.get("extra__asana__workspace") or None
+        self.project = extras.get("extra__asana__project") or None
+
+    def get_conn(self) -> Client:
+        return self.client
+
+    @staticmethod
+    def get_connection_form_widgets() -> Dict[str, Any]:
+        """Returns connection widgets to add to connection form"""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import StringField
+
+        return {
+            "extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
+            "extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
+        }
+
+    @staticmethod
+    def get_ui_field_behaviour() -> Dict:
+        """Returns custom field behaviour"""
+        return {
+            "hidden_fields": ["port", "host", "login", "schema"],
+            "relabeling": {},
+            "placeholders": {
+                "password": "Asana personal access token",
+                "extra__asana__workspace": "Asana workspace gid",
+                "extra__asana__project": "Asana project gid",
+            },
+        }
+
+    @cached_property
+    def client(self) -> Client:
+        """Instantiates python-asana Client"""
+        if not self.connection.password:
+            raise ValueError(
+                "Asana connection password must contain a personal access token: "
+                "https://developers.asana.com/docs/personal-access-token"
+            )
+
+        return Client.access_token(self.connection.password)
+
+    def create_task(self, task_name: str, params: dict) -> dict:
+        """
+        Creates an Asana task.
+
+        :param task_name: Name of the new task
+        :param params: Other task attributes, such as due_on, parent, and notes. For a complete list
+            of possible parameters, see https://developers.asana.com/docs/create-a-task
+        :return: A dict of attributes of the created task, including its gid
+        """
+        merged_params = self._merge_create_task_parameters(task_name, params)
+        self._validate_create_task_parameters(merged_params)
+        response = self.client.tasks.create(params=merged_params)  # pylint: disable=no-member
+        return response
+
+    def _merge_create_task_parameters(self, task_name: str, task_params: dict) -> dict:
+        """
+        Merge create_task parameters with default params from the connection.
+
+        :param task_name: Name of the task
+        :param task_params: Other task parameters which should override defaults from the connection
+        :return: A dict of merged parameters to use in the new task
+        """
+        merged_params = {"name": task_name}
+        if self.project:
+            merged_params["projects"] = [self.project]
+        # Only use default workspace if user did not provide a project id
+        elif self.workspace and not (task_params and ("projects" in task_params)):
+            merged_params["workspace"] = self.workspace
+        if task_params:
+            merged_params.update(task_params)
+        return merged_params
+
+    @staticmethod
+    def _validate_create_task_parameters(params: dict) -> None:
+        """
+        Check that user provided minimal parameters for task creation.
+
+        :param params: A dict of attributes the task to be created should have
+        :return: None; raises ValueError if `params` doesn't contain required parameters
+        """
+        required_parameters = {"workspace", "projects", "parent"}
+        if required_parameters.isdisjoint(params):
+            raise ValueError(
+                f"You must specify at least one of {required_parameters} in the create_task parameters"
+            )
+
+    def delete_task(self, task_id: str) -> dict:
+        """
+        Deletes an Asana task.
+
+        :param task_id: Asana GID of the task to delete
+        :return: A dict containing the response from Asana
+        """
+        try:
+            response = self.client.tasks.delete_task(task_id)  # pylint: disable=no-member
+            return response
+        except NotFoundError:
+            self.log.info("Asana task %s not found for deletion.", task_id)
+            return {}
+
+    def find_task(self, params: dict) -> list:
+        """
+        Retrieves a list of Asana tasks that match search parameters.
+
+        :param params: Attributes that matching tasks should have. For a list of possible parameters,
+            see https://developers.asana.com/docs/get-multiple-tasks
+        :return: A list of dicts containing attributes of matching Asana tasks
+        """
+        merged_params = self._merge_find_task_parameters(params)
+        self._validate_find_task_parameters(merged_params)
+        response = self.client.tasks.find_all(params=merged_params)  # pylint: disable=no-member
+        return list(response)
+
+    def _merge_find_task_parameters(self, search_parameters: dict) -> dict:
+        """
+        Merge find_task parameters with default params from the connection.
+
+        :param search_parameters: Attributes that tasks matching the search should have; these override
+            defaults from the connection
+        :return: A dict of merged parameters to use in the search
+        """
+        merged_params = {}
+        if self.project:
+            merged_params["project"] = self.project
+        # Only use default workspace if user did not provide a project id
+        elif self.workspace and not (search_parameters and ("project" in search_parameters)):
+            merged_params["workspace"] = self.workspace
+        if search_parameters:
+            merged_params.update(search_parameters)
+        return merged_params
+
+    @staticmethod
+    def _validate_find_task_parameters(params: dict) -> None:
+        """
+        Check that the user provided minimal search parameters.
+
+        :param params: Dict of parameters to be used in the search
+        :return: None; raises ValueError if search parameters do not contain minimum required attributes
+        """
+        one_of_list = {"project", "section", "tag", "user_task_list"}
+        both_of_list = {"assignee", "workspace"}
+        contains_both = both_of_list.issubset(params)
+        contains_one = not one_of_list.isdisjoint(params)
+        if not (contains_both or contains_one):
+            raise ValueError(
+                f"You must specify at least one of {one_of_list} "
+                f"or both of {both_of_list} in the find_task parameters."
+            )
+
+    def update_task(self, task_id: str, params: dict) -> dict:
+        """
+        Updates an existing Asana task.
+
+        :param task_id: Asana GID of task to update
+        :param params: New values of the task's attributes. For a list of possible parameters, see
+            https://developers.asana.com/docs/update-a-task
+        :return: A dict containing the updated task's attributes
+        """
+        response = self.client.tasks.update(task_id, params)  # pylint: disable=no-member
+        return response
+
+    def create_project(self, params: dict) -> dict:
+        """
+        Creates a new project.
+
+        :param params: Attributes that the new project should have. See
+            https://developers.asana.com/docs/create-a-project#create-a-project-parameters
+            for a list of possible parameters.
+        :return: A dict containing the new project's attributes, including its GID.
+        """
+        merged_params = self._merge_project_parameters(params)
+        self._validate_create_project_parameters(merged_params)
+        response = self.client.projects.create(merged_params)  # pylint: disable=no-member
+        return response
+
+    @staticmethod
+    def _validate_create_project_parameters(params: dict) -> None:
+        """
+        Check that user provided the minimum required parameters for project creation
+
+        :param params: Attributes that the new project should have
+        :return: None; raises a ValueError if `params` does not contain the minimum required attributes.
+        """
+        required_parameters = {"workspace", "team"}
+        if required_parameters.isdisjoint(params):
+            raise ValueError(
+                f"You must specify at least one of {required_parameters} in the create_project params"
+            )
+
+    def _merge_project_parameters(self, params: dict) -> dict:
+        """
+        Merge parameters passed into a project method with default params from the connection.
+
+        :param params: Parameters passed into one of the project methods, which should override
+            defaults from the connection
+        :return: A dict of merged parameters
+        """
+        merged_params = {} if self.workspace is None else {"workspace": self.workspace}
+        merged_params.update(params)
+        return merged_params
+
+    def find_project(self, params: dict) -> list:
+        """
+        Retrieves a list of Asana projects that match search parameters.
+
+        :param params: Attributes which matching projects should have. See
+            https://developers.asana.com/docs/get-multiple-projects
+            for a list of possible parameters.
+        :return: A list of dicts containing attributes of matching Asana projects
+        """
+        merged_params = self._merge_project_parameters(params)
+        response = self.client.projects.find_all(merged_params)  # pylint: disable=no-member
+        return list(response)
+
+    def update_project(self, project_id: str, params: dict) -> dict:
+        """
+        Updates an existing project.
+
+        :param project_id: Asana GID of the project to update
+        :param params: New attributes that the project should have. See
+            https://developers.asana.com/docs/update-a-project#update-a-project-parameters
+            for a list of possible parameters
+        :return: A dict containing the updated project's attributes
+        """
+        response = self.client.projects.update(project_id, params)  # pylint: disable=no-member
+        return response
+
+    def delete_project(self, project_id: str) -> dict:
+        """
+        Deletes a project.
+
+        :param project_id: Asana GID of the project to delete
+        :return: A dict containing the response from Asana
+        """
+        try:
+            response = self.client.projects.delete(project_id)  # pylint: disable=no-member
+            return response
+        except NotFoundError:
+            self.log.info("Asana project %s not found for deletion.", project_id)
+            return {}
diff --git a/airflow/providers/asana/operators/__init__.py b/airflow/providers/asana/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/asana/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/asana/operators/asana_tasks.py b/airflow/providers/asana/operators/asana_tasks.py
new file mode 100644
index 0000000..e6ea0c8
--- /dev/null
+++ b/airflow/providers/asana/operators/asana_tasks.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.asana.hooks.asana import AsanaHook
+
+
+class AsanaCreateTaskOperator(BaseOperator):
+    """
+    This operator can be used to create Asana tasks. For more information on
+    Asana optional task parameters, see https://developers.asana.com/docs/create-a-task
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AsanaCreateTaskOperator`
+
+    :param conn_id: The Asana connection to use.
+    :type conn_id: str
+    :param name: Name of the Asana task.
+    :type name: str
+    :param task_parameters: Any of the optional task creation parameters.
+        See https://developers.asana.com/docs/create-a-task for a complete list.
+        You must specify at least one of 'workspace', 'parent', or 'projects'
+        either here or in the connection.
+    :type task_parameters: dict
+    """
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        name: str,
+        task_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.conn_id = conn_id
+        self.name = name
+        self.task_parameters = task_parameters
+
+    def execute(self, context: Dict) -> str:
+        hook = AsanaHook(conn_id=self.conn_id)
+        response = hook.create_task(self.name, self.task_parameters)
+        self.log.info(response)
+        return response["gid"]
+
+
+class AsanaUpdateTaskOperator(BaseOperator):
+    """
+    This operator can be used to update Asana tasks.
+    For more information on Asana optional task parameters, see
+    https://developers.asana.com/docs/update-a-task
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AsanaUpdateTaskOperator`
+
+    :param conn_id: The Asana connection to use.
+    :type conn_id: str
+    :param asana_task_gid: Asana task ID to update
+    :type asana_task_gid: str
+    :param task_parameters: Any task parameters that should be updated.
+        See https://developers.asana.com/docs/update-a-task for a complete list.
+    :type task_update_parameters: dict
+    """
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        asana_task_gid: str,
+        task_parameters: dict,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.conn_id = conn_id
+        self.asana_task_gid = asana_task_gid
+        self.task_parameters = task_parameters
+
+    def execute(self, context: Dict) -> None:
+        hook = AsanaHook(conn_id=self.conn_id)
+        response = hook.update_task(self.asana_task_gid, self.task_parameters)
+        self.log.info(response)
+
+
+class AsanaDeleteTaskOperator(BaseOperator):
+    """
+    This operator can be used to delete Asana tasks.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AsanaDeleteTaskOperator`
+
+    :param conn_id: The Asana connection to use.
+    :type conn_id: str
+    :param asana_task_gid: Asana Task ID to delete.
+    :type asana_task_gid: str
+    """
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        asana_task_gid: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.conn_id = conn_id
+        self.asana_task_gid = asana_task_gid
+
+    def execute(self, context: Dict) -> None:
+        hook = AsanaHook(conn_id=self.conn_id)
+        response = hook.delete_task(self.asana_task_gid)
+        self.log.info(response)
+
+
+class AsanaFindTaskOperator(BaseOperator):
+    """
+    This operator can be used to retrieve Asana tasks that match various filters.
+    See https://developers.asana.com/docs/update-a-task for a list of possible filters.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AsanaFindTaskOperator`
+
+    :param conn_id: The Asana connection to use.
+    :type conn_id: str
+    :param search_parameters: The parameters used to find relevant tasks. You must
+        specify at least one of `project`, `section`, `tag`, `user_task_list`, or both
+        `assignee` and `workspace` either here or in the connection.
+    :type search_parameters: dict
+    """
+
+    def __init__(
+        self,
+        *,
+        conn_id: str,
+        search_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.conn_id = conn_id
+        self.search_parameters = search_parameters
+
+    def execute(self, context: Dict) -> list:
+        hook = AsanaHook(conn_id=self.conn_id)
+        response = hook.find_task(self.search_parameters)
+        self.log.info(response)
+        return response
diff --git a/airflow/providers/asana/provider.yaml b/airflow/providers/asana/provider.yaml
new file mode 100644
index 0000000..f3914f4
--- /dev/null
+++ b/airflow/providers/asana/provider.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-asana
+name: Asana
+description: |
+    `Asana <https://app.asana.com/>`__
+
+versions:
+  - 1.0.0
+integrations:
+  - integration-name: Asana
+    external-doc-url: https://developers.asana.com/docs
+    how-to-guide:
+      - /docs/apache-airflow-providers-asana/operators/asana.rst
+    tags: [software]
+
+operators:
+  - integration-name: Asana
+    python-modules:
+      - airflow.providers.asana.operators.asana_tasks
+
+hooks:
+  - integration-name: Asana
+    python-modules:
+      - airflow.providers.asana.hooks.asana
+
+hook-class-names:
+  - airflow.providers.asana.hooks.asana.AsanaHook
diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py
index e245389..6b711f5 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -1476,7 +1476,7 @@ def get_all_changes_for_package(
     changes_table += changes_table_for_version
     if verbose:
         print_changes_table(changes_table)
-    return True, array_of_changes[0], changes_table
+    return True, array_of_changes[0] if len(array_of_changes) > 0 else None, changes_table
 
 
 def get_provider_details(provider_package_id: str) -> ProviderPackageDetails:
diff --git a/docs/apache-airflow-providers-asana/commits.rst b/docs/apache-airflow-providers-asana/commits.rst
new file mode 100644
index 0000000..1e48fbf
--- /dev/null
+++ b/docs/apache-airflow-providers-asana/commits.rst
@@ -0,0 +1,27 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Package apache-airflow-providers-asana
+------------------------------------------------------
+
+`Asana <https://developers.asana.com/>`__
+
+
+This is detailed commit list of changes for versions provider package: ``asana``.
+For high-level changelog, see :doc:`package information including changelog <index>`.
diff --git a/docs/apache-airflow-providers-asana/connections/asana.rst b/docs/apache-airflow-providers-asana/connections/asana.rst
new file mode 100644
index 0000000..5d6a58d
--- /dev/null
+++ b/docs/apache-airflow-providers-asana/connections/asana.rst
@@ -0,0 +1,31 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Asana Connection
+================
+The Asana connection provides credentials for accessing the Asana API.
+
+Configuring the Connection
+--------------------------
+Password (required)
+    Specify a `personal access token <https://developers.asana.com/docs/personal-access-token>`_ for the account.
+Workspace (optional)
+    Specify a default workspace to use in requests.
+Project (optional)
+    Specify a default project to use in requests.
diff --git a/docs/apache-airflow-providers-asana/index.rst b/docs/apache-airflow-providers-asana/index.rst
new file mode 100644
index 0000000..54e1385
--- /dev/null
+++ b/docs/apache-airflow-providers-asana/index.rst
@@ -0,0 +1,123 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-asana``
+==================================
+
+Content
+-------
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    Connection types <connections/asana>
+    Operators <operators/asana>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: References
+
+    Python API <_api/airflow/providers/asana/index>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/asana/example_dags>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    PyPI Repository <https://pypi.org/project/apache-airflow-providers-asana/>
+
+.. THE REMINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
+
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Commits
+
+    Detailed list of commits <commits>
+
+
+Package apache-airflow-providers-asana
+------------------------------------------------------
+
+`Asana <https://developers.asana.com>`__
+
+
+Release: 1.0.0
+
+Provider package
+----------------
+
+This is a provider package for the ``asana`` provider. All classes for this provider package
+are in ``airflow.providers.asana`` python package.
+
+Installation
+------------
+
+.. note::
+
+    On November 2020, new version of PIP (20.3) has been released with a new, 2020 resolver. This resolver
+    does not yet work with Apache Airflow and might lead to errors in installation - depends on your choice
+    of extras. In order to install Airflow you need to either downgrade pip to version 20.2.4
+    ``pip install --upgrade pip==20.2.4`` or, in case you use Pip 20.3, you need to add option
+    ``--use-deprecated legacy-resolver`` to your pip install command.
+
+
+You can install this package on top of an existing airflow 2.* installation via
+``pip install apache-airflow-providers-asana``
+
+PIP requirements
+----------------
+
+=============  ==================
+PIP package    Version required
+=============  ==================
+``asana``      ``>=0.10``
+=============  ==================
+
+
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/docs/apache-airflow-providers-asana/operators/asana.rst b/docs/apache-airflow-providers-asana/operators/asana.rst
new file mode 100644
index 0000000..28389fb
--- /dev/null
+++ b/docs/apache-airflow-providers-asana/operators/asana.rst
@@ -0,0 +1,88 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/operator:AsanaCreateTaskOperator:
+
+AsanaCreateTaskOperator
+=======================
+
+Use the :class:`~airflow.providers.asana.operators.AsanaCreateTaskOperator` to
+create an Asana task.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AsanaCreateTaskOperator minimally requires the new task's name and
+the Asana connection to use to connect to your account (``conn_id``). There are many other
+`task attributes you can specify <https://developers.asana.com/docs/create-a-task>`_
+through the ``task_parameters``. You must specify at least one of ``workspace``,
+``parent``, or ``projects`` in the ``task_parameters`` or in the connection.
+
+
+.. _howto/operator:AsanaDeleteTaskOperator:
+
+AsanaDeleteTaskOperator
+=======================
+
+Use the :class:`~airflow.providers.asana.operators.AsanaDeleteTaskOperator` to
+delete an existing Asana task.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AsanaDeleteTaskOperator requires the task id to delete. Use the ``conn_id``
+parameter to specify the Asana connection to use to connect to your account.
+
+
+.. _howto/operator:AsanaFindTaskOperator:
+
+AsanaFindTaskOperator
+=======================
+
+Use the :class:`~airflow.providers.asana.operators.AsanaFindTaskOperator` to
+search for Asana tasks that fit some criteria.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AsanaFindTaskOperator requires a dict of search parameters following the description
+`here <https://developers.asana.com/docs/get-multiple-tasks>`_.
+Use the ``conn_id`` parameter to specify the Asana connection to use to connect
+to your account. Any parameters provided through the connection will be used in the
+search if not overridden in the ``search_parameters``.
+
+.. _howto/operator:AsanaUpdateTaskOperator:
+
+AsanaUpdateTaskOperator
+=======================
+
+Use the :class:`~airflow.providers.asana.operators.AsanaUpdateTaskOperator` to
+update an existing Asana task.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AsanaUpdateTaskOperator minimally requires the task id to update and
+the Asana connection to use to connect to your account (``conn_id``). There are many other
+`task attributes you can overwrite <https://developers.asana.com/docs/update-a-task>`_
+through the ``task_parameters``.
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 3dcd15b..7d218ab 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -148,6 +148,8 @@ Those are extras that add dependencies needed for integration with external serv
 +---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | amazon              | ``pip install 'apache-airflow[amazon]'``            | Amazon Web Services                                 |
 +---------------------+-----------------------------------------------------+-----------------------------------------------------+
+| asana               | ``pip install 'apache-airflow[asana]'``             | Asana hooks and operators                           |
++---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | azure               | ``pip install 'apache-airflow[microsoft.azure]'``   | Microsoft Azure                                     |
 +---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | cloudant            | ``pip install 'apache-airflow[cloudant]'``          | Cloudant hook                                       |
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 0dcd734..3c38bbf 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -17,6 +17,7 @@ Ansible
 AppBuilder
 Arg
 Args
+Asana
 Async
 Atlassian
 Auth
@@ -149,7 +150,6 @@ Flink
 FluentD
 Fokko
 Formaturas
-Fspark
 Fundera
 GCS
 GH
@@ -472,6 +472,7 @@ args
 arn
 arraysize
 artwr
+asana
 asc
 ascii
 asciiart
diff --git a/scripts/in_container/run_install_and_test_provider_packages.sh b/scripts/in_container/run_install_and_test_provider_packages.sh
index 9dd8540..983e7a3 100755
--- a/scripts/in_container/run_install_and_test_provider_packages.sh
+++ b/scripts/in_container/run_install_and_test_provider_packages.sh
@@ -95,7 +95,7 @@ function discover_all_provider_packages() {
     # Columns is to force it wider, so it doesn't wrap at 80 characters
     COLUMNS=180 airflow providers list
 
-    local expected_number_of_providers=66
+    local expected_number_of_providers=67
     local actual_number_of_providers
     actual_providers=$(airflow providers list --output yaml | grep package_name)
     actual_number_of_providers=$(wc -l <<<"$actual_providers")
@@ -118,7 +118,7 @@ function discover_all_hooks() {
     group_start "Listing available hooks via 'airflow providers hooks'"
     COLUMNS=180 airflow providers hooks
 
-    local expected_number_of_hooks=64
+    local expected_number_of_hooks=65
     local actual_number_of_hooks
     actual_number_of_hooks=$(airflow providers hooks --output table | grep -c "| apache" | xargs)
     if [[ ${actual_number_of_hooks} != "${expected_number_of_hooks}" ]]; then
@@ -157,7 +157,7 @@ function discover_all_connection_form_widgets() {
 
     COLUMNS=180 airflow providers widgets
 
-    local expected_number_of_widgets=42
+    local expected_number_of_widgets=44
     local actual_number_of_widgets
     actual_number_of_widgets=$(airflow providers widgets --output table | grep -c ^extra)
     if [[ ${actual_number_of_widgets} != "${expected_number_of_widgets}" ]]; then
@@ -176,7 +176,7 @@ function discover_all_field_behaviours() {
     group_start "Listing connections with custom behaviours via 'airflow providers behaviours'"
     COLUMNS=180 airflow providers behaviours
 
-    local expected_number_of_connections_with_behaviours=20
+    local expected_number_of_connections_with_behaviours=21
     local actual_number_of_connections_with_behaviours
     actual_number_of_connections_with_behaviours=$(airflow providers behaviours --output table | grep -v "===" | \
         grep -v field_behaviours | grep -cv "^ " | xargs)
diff --git a/setup.py b/setup.py
index b057f51..df7bc01 100644
--- a/setup.py
+++ b/setup.py
@@ -200,6 +200,7 @@ amazon = [
 apache_beam = [
     'apache-beam>=2.20.0',
 ]
+asana = ['asana>=0.10', 'cached-property>=1.5.2']
 async_packages = [
     'eventlet>= 0.9.7',
     'gevent>=0.13',
@@ -556,6 +557,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
     'apache.pinot': pinot,
     'apache.spark': spark,
     'apache.sqoop': [],
+    'asana': asana,
     'celery': celery,
     'cloudant': cloudant,
     'cncf.kubernetes': kubernetes,
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index e6f26ce..f42bb18 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -34,6 +34,7 @@ ALL_PROVIDERS = [
     'apache-airflow-providers-apache-pinot',
     'apache-airflow-providers-apache-spark',
     'apache-airflow-providers-apache-sqoop',
+    'apache-airflow-providers-asana',
     'apache-airflow-providers-celery',
     'apache-airflow-providers-cloudant',
     'apache-airflow-providers-cncf-kubernetes',
@@ -90,6 +91,7 @@ ALL_PROVIDERS = [
 ]
 
 CONNECTIONS_LIST = [
+    'asana',
     'aws',
     'azure',
     'azure_batch',
@@ -157,6 +159,8 @@ CONNECTIONS_LIST = [
 ]
 
 CONNECTION_FORM_WIDGETS = [
+    'extra__asana__project',
+    'extra__asana__workspace',
     'extra__azure__subscriptionId',
     'extra__azure__tenantId',
     'extra__azure_batch__account_url',
@@ -202,6 +206,7 @@ CONNECTION_FORM_WIDGETS = [
 ]
 
 CONNECTIONS_WITH_FIELD_BEHAVIOURS = [
+    'asana',
     'azure',
     'azure_batch',
     'azure_container_registry',
diff --git a/tests/providers/asana/__init__.py b/tests/providers/asana/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/asana/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/asana/hooks/__init__.py b/tests/providers/asana/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/asana/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/asana/hooks/test_asana.py b/tests/providers/asana/hooks/test_asana.py
new file mode 100644
index 0000000..220b239
--- /dev/null
+++ b/tests/providers/asana/hooks/test_asana.py
@@ -0,0 +1,254 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest.mock import patch
+
+from asana import Client
+
+from airflow.models import Connection
+from airflow.providers.asana.hooks.asana import AsanaHook
+
+
+class TestAsanaHook(unittest.TestCase):
+    """
+    Tests for AsanaHook Asana client retrieval
+    """
+
+    def test_asana_client_retrieved(self):
+        """
+        Test that we successfully retrieve an Asana client given a Connection with complete information.
+        :return: None
+        """
+        with patch.object(
+            AsanaHook, "get_connection", return_value=Connection(conn_type="asana", password="test")
+        ):
+            hook = AsanaHook()
+        client = hook.get_conn()
+        self.assertEqual(type(client), Client)
+
+    def test_missing_password_raises(self):
+        """
+        Test that the Asana hook raises an exception if password not provided in connection.
+        :return: None
+        """
+        with patch.object(AsanaHook, "get_connection", return_value=Connection(conn_type="asana")):
+            hook = AsanaHook()
+        with self.assertRaises(ValueError):
+            hook.get_conn()
+
+    def test_merge_create_task_parameters_default_project(self):
+        """
+        Test that merge_create_task_parameters correctly merges the default and method parameters when we
+        do not override the default project.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__project": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"name": "test", "projects": ["1"]}
+        self.assertEqual(
+            expected_merged_params, hook._merge_create_task_parameters("test", {})  # pylint: disable=W0212
+        )
+
+    def test_merge_create_task_parameters_specified_project(self):
+        """
+        Test that merge_create_task_parameters correctly merges the default and method parameters when we
+        override the default project.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__project": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"name": "test", "projects": ["1", "2"]}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_create_task_parameters("test", {"projects": ["1", "2"]}),  # pylint: disable=W0212
+        )
+
+    def test_merge_create_task_parameters_specified_workspace(self):
+        """
+        Test that merge_create_task_parameters correctly merges the default and method parameters when we
+        do not override the default workspace.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__workspace": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"name": "test", "workspace": "1"}
+        self.assertEqual(
+            expected_merged_params, hook._merge_create_task_parameters("test", {})  # pylint: disable=W0212
+        )
+
+    def test_merge_create_task_parameters_default_project_overrides_default_workspace(self):
+        """
+        Test that merge_create_task_parameters uses the default project over the default workspace
+        if it is available
+        :return: None
+        """
+        conn = Connection(
+            conn_type="asana",
+            password="test",
+            extra='{"extra__asana__workspace": "1", "extra__asana__project": "1"}',
+        )
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"name": "test", "projects": ["1"]}
+        self.assertEqual(
+            expected_merged_params, hook._merge_create_task_parameters("test", {})  # pylint: disable=W0212
+        )
+
+    def test_merge_create_task_parameters_specified_project_overrides_default_workspace(self):
+        """
+        Test that merge_create_task_parameters uses the method parameter project over the default workspace
+        if it is available
+        :return: None
+        """
+        conn = Connection(
+            conn_type="asana",
+            password="test",
+            extra='{"extra__asana__workspace": "1"}',
+        )
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"name": "test", "projects": ["2"]}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_create_task_parameters("test", {"projects": ["2"]}),  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_default_project(self):
+        """
+        Test that merge_find_task_parameters correctly merges the default and method parameters when we
+        do not override the default project.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__project": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"project": "1"}
+        self.assertEqual(
+            expected_merged_params, hook._merge_find_task_parameters({})  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_specified_project(self):
+        """
+        Test that merge_find_task_parameters correctly merges the default and method parameters when we
+        do override the default project.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__project": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"project": "2"}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_find_task_parameters({"project": "2"}),  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_default_workspace(self):
+        """
+        Test that merge_find_task_parameters correctly merges the default and method parameters when we
+        do not override the default workspace.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__workspace": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"workspace": "1", "assignee": "1"}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_find_task_parameters({"assignee": "1"}),  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_specified_workspace(self):
+        """
+        Test that merge_find_task_parameters correctly merges the default and method parameters when we
+        do override the default workspace.
+        :return: None
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__workspace": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"workspace": "2", "assignee": "1"}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_find_task_parameters({"workspace": "2", "assignee": "1"}),  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_default_project_overrides_workspace(self):
+        """
+        Test that merge_find_task_parameters uses the default project over the workspace if it is available
+        :return: None
+        """
+        conn = Connection(
+            conn_type="asana",
+            password="test",
+            extra='{"extra__asana__workspace": "1", "extra__asana__project": "1"}',
+        )
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"project": "1"}
+        self.assertEqual(
+            expected_merged_params, hook._merge_find_task_parameters({})  # pylint: disable=W0212
+        )
+
+    def test_merge_find_task_parameters_specified_project_overrides_workspace(self):
+        """
+        Test that merge_find_task_parameters uses the method parameter project over the default workspace
+        if it is available
+        :return: None
+        """
+        conn = Connection(
+            conn_type="asana",
+            password="test",
+            extra='{"extra__asana__workspace": "1"}',
+        )
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"project": "2"}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_find_task_parameters({"project": "2"}),  # pylint: disable=W0212
+        )
+
+    def test_merge_project_parameters(self):
+        """
+        Tests that default workspace is used if not overridden
+        :return:
+        """
+        conn = Connection(conn_type="asana", password="test", extra='{"extra__asana__workspace": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"workspace": "1", "name": "name"}
+        self.assertEqual(
+            expected_merged_params, hook._merge_project_parameters({"name": "name"})  # pylint: disable=W0212
+        )
+
+    def test_merge_project_parameters_override(self):
+        """
+        Tests that default workspace is successfully overridden
+        :return:
+        """
+        conn = Connection(conn_type='asana', password='test', extra='{"extra__asana__workspace": "1"}')
+        with patch.object(AsanaHook, "get_connection", return_value=conn):
+            hook = AsanaHook()
+        expected_merged_params = {"workspace": "2"}
+        self.assertEqual(
+            expected_merged_params,
+            hook._merge_project_parameters({"workspace": "2"}),  # pylint: disable=W0212
+        )
diff --git a/tests/providers/asana/operators/__init__.py b/tests/providers/asana/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/asana/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/asana/operators/test_asana_tasks.py b/tests/providers/asana/operators/test_asana_tasks.py
new file mode 100644
index 0000000..9a653bc
--- /dev/null
+++ b/tests/providers/asana/operators/test_asana_tasks.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest.mock import Mock, patch
+
+from airflow.models import Connection
+from airflow.models.dag import DAG
+from airflow.providers.asana.operators.asana_tasks import (
+    AsanaCreateTaskOperator,
+    AsanaDeleteTaskOperator,
+    AsanaFindTaskOperator,
+    AsanaUpdateTaskOperator,
+)
+from airflow.utils import db, timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+TEST_DAG_ID = "unit_test_dag"
+asana_client_mock = Mock(name="asana_client_for_test")
+
+
+class TestAsanaTaskOperators(unittest.TestCase):
+    """
+    Test that the AsanaTaskOperators are using the python-asana methods as expected.
+    """
+
+    def setUp(self):
+        args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+        db.merge_conn(Connection(conn_id="asana_test", conn_type="asana", password="test"))
+
+    @patch("airflow.providers.asana.hooks.asana.Client", autospec=True, return_value=asana_client_mock)
+    def test_asana_create_task_operator(self, asana_client):
+        """
+        Tests that the AsanaCreateTaskOperator makes the expected call to python-asana given valid arguments.
+        """
+        asana_client.access_token.return_value.tasks.create.return_value = {"gid": "1"}
+        create_task = AsanaCreateTaskOperator(
+            task_id="create_task",
+            conn_id="asana_test",
+            name="test",
+            task_parameters={"workspace": "1"},
+            dag=self.dag,
+        )
+        create_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        assert asana_client.access_token.return_value.tasks.create.called
+
+    @patch("airflow.providers.asana.hooks.asana.Client", autospec=True, return_value=asana_client_mock)
+    def test_asana_find_task_operator(self, asana_client):
+        """
+        Tests that the AsanaFindTaskOperator makes the expected call to python-asana given valid arguments.
+        """
+        asana_client.access_token.return_value.tasks.create.return_value = {"gid": "1"}
+        find_task = AsanaFindTaskOperator(
+            task_id="find_task",
+            conn_id="asana_test",
+            search_parameters={"project": "test"},
+            dag=self.dag,
+        )
+        find_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        assert asana_client.access_token.return_value.tasks.find_all.called
+
+    @patch("airflow.providers.asana.hooks.asana.Client", autospec=True, return_value=asana_client_mock)
+    def test_asana_update_task_operator(self, asana_client):
+        """
+        Tests that the AsanaUpdateTaskOperator makes the expected call to python-asana given valid arguments.
+        """
+        update_task = AsanaUpdateTaskOperator(
+            task_id="update_task",
+            conn_id="asana_test",
+            asana_task_gid="test",
+            task_parameters={"completed": True},
+            dag=self.dag,
+        )
+        update_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        assert asana_client.access_token.return_value.tasks.update.called
+
+    @patch("airflow.providers.asana.hooks.asana.Client", autospec=True, return_value=asana_client_mock)
+    def test_asana_delete_task_operator(self, asana_client):
+        """
+        Tests that the AsanaDeleteTaskOperator makes the expected call to python-asana given valid arguments.
+        """
+        delete_task = AsanaDeleteTaskOperator(
+            task_id="delete_task", conn_id="asana_test", asana_task_gid="test", dag=self.dag
+        )
+        delete_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        assert asana_client.access_token.return_value.tasks.delete_task.called