You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/24 08:09:21 UTC

[GitHub] [airflow] potiuk opened a new pull request #8991: All classes in backport providers are now importable in Airflow 1.10

potiuk opened a new pull request #8991:
URL: https://github.com/apache/airflow/pull/8991


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429937479



##########
File path: requirements/requirements-python3.7.txt
##########
@@ -372,7 +371,7 @@ virtualenv==20.0.21
 watchtower==0.7.3
 wcwidth==0.1.9
 websocket-client==0.57.0
-wrapt==1.12.1
+wrapt==1.11.2

Review comment:
       This is what current pypi and set of dependencies dump when I run --eager update during generate-requirements. Likely triggered by a transient dependency.
   
   
   As long as they work (they do - all the tests pass) I do not care too much. (and do not have to).  Those are just constraint files (the real requirements and limits are in setup.py). 
   
   We should remember those files are by-product and generated automatically (so they will change pretty much any time setup.py changes and in pretty-much unpredictable ways depending on the transient requirements). This is the same problem we had always, but what we have now is that we capture a "working set of requirements" that not only know can be installed but also that passes all our tests (because those changes can only be merged after all tests pass). 
   
   And this is a set of requirements that today person running 'pip install airflow' will get anyway and we have at least a chance to see if they work. So as long as all tests pass - this is exactly what we need. And I am happy, and frankly - I have no intention to spend any time  chasing which transient requirement caused it.
   
   But feel free to chase it if you really want though :).
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BasPH commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
BasPH commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429612732



##########
File path: airflow/providers/odbc/hooks/odbc.py
##########
@@ -24,7 +24,23 @@
 import pyodbc
 
 from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.helpers import merge_dicts
+
+
+# We do not import it from airflow.utils because merge_dicts is not available in Airflow 1.10
+# So this operator would not be backportable
+def merge_dicts(dict1, dict2):

Review comment:
       I understand this change comes from master to make things compatible, but think we should remove it entirely (in another PR). This is a one-liner in native Python: `{**dict1, **dict2}`, no need for a custom dict merge function.

##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,

Review comment:
       nice one!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633699747


   > I mean we should do `print(..., file=sys.stdout)` for anything that is an "error"
   
   I haven't followed the PR just read the line :D "I mean we should do print(..., file=sys.stdout) for anything that is an "error", hence my comment ;)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429846802



##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,703 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators (in example dags).
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +
+                     self.druid_ingest_conn_id = druid_ingest_conn_id
+                     self.timeout = timeout
+                     self.max_ingestion_time = max_ingestion_time
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_super_init_call_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for ch in node.post_order():
+                if isinstance(ch, Leaf) and ch.value == "super":
+                    if any(c.value for c in ch.parent.post_order() if isinstance(c, Leaf)):
+                        ch.parent.remove()
+
+        self.qry.select_subclass("BaseHook").modify(remove_super_init_call_modifier)
+
+    def remove_tags(self):
+        """
+        Removes tags from execution of the operators (in example_dags). Example diff generated:
+
+        .. code-block:: diff
+
+
+            -- ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            @@ -83,8 +83,7 @@
+             with models.DAG(
+                 "example_datasync_2",
+                 default_args=default_args,
+            -    schedule_interval=None,  # Override to match your needs
+            -    tags=['example'],
+            +    schedule_interval=None,
+             ) as dag:
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_tags_modifier(_: LN, capture: Capture, filename: Filename) -> None:
+            for node in capture['function_arguments'][0].post_order():
+                if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
+                    if node.parent.next_sibling and node.parent.next_sibling.value == ",":
+                        node.parent.next_sibling.remove()
+                    node.parent.remove()
+
+        # Remove tags
+        self.qry.select_method("DAG").is_call().modify(remove_tags_modifier)
+
+    def remove_poke_mode_only_decorator(self):
+        """
+        Removes @poke_mode_only decorator. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/sensors/gcs.py
+            +++ ./airflow/providers/google/cloud/sensors/gcs.py
+            @@ -189,7 +189,6 @@
+                 return datetime.now()
+
+
+            -@poke_mode_only
+             class GCSUploadSessionCompleteSensor(BaseSensorOperator):
+                 \"\"\"
+                Checks for changes in the number of objects at prefix in Google Cloud Storage
+
+        """
+        def find_and_remove_poke_mode_only_import(node: LN):
+            for child in node.children:
+                if isinstance(child, Leaf) and child.type == 1 and child.value == 'poke_mode_only':
+                    import_node = child.parent
+                    # remove the import by default
+                    skip_import_remove = False
+                    if isinstance(child.prev_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # remove coma before the whole import
+                        child.prev_sibling.remove()
+                        # do not remove if there are other imports
+                        skip_import_remove = True
+                    if isinstance(child.next_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # but keep the one after and do not remove the whole import
+                        skip_import_remove = True
+                    # remove the import
+                    child.remove()
+                    if not skip_import_remove:
+                        # remove import of there were no sibling
+                        import_node.remove()
+                else:
+                    find_and_remove_poke_mode_only_import(child)
+
+        def find_root_remove_import(node: LN):
+            current_node = node
+            while current_node.parent:
+                current_node = current_node.parent
+            find_and_remove_poke_mode_only_import(current_node)
+
+        def is_poke_mode_only_decorator(node: LN) -> bool:
+            return node.children and len(node.children) >= 2 and \
+                isinstance(node.children[0], Leaf) and node.children[0].value == '@' and \
+                isinstance(node.children[1], Leaf) and node.children[1].value == 'poke_mode_only'
+
+        # noinspection PyUnusedLocal
+        def remove_poke_mode_only_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for child in capture['node'].parent.children:
+                if is_poke_mode_only_decorator(child):
+                    find_root_remove_import(child)
+                    child.remove()
+
+        self.qry.select_subclass("BaseSensorOperator").modify(remove_poke_mode_only_modifier)
+
+    def refactor_amazon_package(self):
+        """
+        Fixes to "amazon" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy typing_compat.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/operators/ecs.py
+            +++ ./airflow/providers/amazon/aws/operators/ecs.py
+            @@ -24,7 +24,7 @@
+             from airflow.models import BaseOperator
+             from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+             from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            -from airflow.typing_compat import Protocol, runtime_checkable
+            +from airflow.providers.amazon.common.utils.typing_compat import Protocol, runtime_checkable
+             from airflow.utils.decorators import apply_defaults
+
+        """
+
+        # noinspection PyUnusedLocal
+        def amazon_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/amazon/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("amazon"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "typing_compat.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "typing_compat.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.typing_compat").
+            filter(callback=amazon_package_filter).
+            rename("airflow.providers.amazon.common.utils.typing_compat")
+        )
+
+    def refactor_google_package(self):
+        """
+        Fixes to "google" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there. Note that in this case we also rename
+        the imports in the copied files.
+
+        For example we copy python_virtualenv.py, process_utils.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -28,11 +28,11 @@
+
+             from airflow.exceptions import AirflowException
+             from airflow.models import BaseOperator
+            -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+            +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+             from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+             from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+             from airflow.utils.decorators import apply_defaults
+            -from airflow.utils.process_utils import execute_in_subprocess, patch_environ
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+        And in the copied python_virtualenv.py we also change import to process_utils.py. This happens
+        automatically and is solved by Pybowler.
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/python_virtualenv.py
+            +++ ./airflow/providers/google/common/utils/python_virtualenv.py
+            @@ -21,7 +21,7 @@
+             \"\"\"
+            from typing import List, Optional
+
+            -from airflow.utils.process_utils import execute_in_subprocess
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+            def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool)
+
+
+        We also rename Base operator links to deprecated names:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/mlengine.py
+            +++ ./airflow/providers/google/cloud/operators/mlengine.py
+            @@ -24,7 +24,7 @@
+             from typing import List, Optional
+
+             from airflow.exceptions import AirflowException
+            -from airflow.models import BaseOperator, BaseOperatorLink
+            +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+             from airflow.models.taskinstance import TaskInstance
+             from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
+             from airflow.utils.decorators import apply_defaults
+
+
+        We remove GKEStartPodOperator (example in remove_class method)
+
+
+        We also copy (google.common.utils) and rename imports to the helpers.
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.google.common.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+        And also module_loading  which is used by helpers
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/helpers.py
+            +++ ./airflow/providers/google/common/utils/helpers.py
+            @@ -26,7 +26,7 @@
+             from jinja2 import Template
+
+             from airflow.exceptions import AirflowException
+            -from airflow.utils.module_loading import import_string
+            +from airflow.providers.google.common.utils.module_loading import import_string
+
+             KEY_REGEX = re.compile(r'^[\\w.-]+$')
+
+        """
+        # noinspection PyUnusedLocal
+        def google_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/google/")
+
+        # noinspection PyUnusedLocal
+        def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            """Check if select is exactly [airflow, . , models]"""
+            return len([ch for ch in node.children[1].leaves()]) == 3
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("google"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "python_virtualenv.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "python_virtualenv.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "helpers.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "module_loading.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "module_loading.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.python_virtualenv").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.python_virtualenv")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "process_utils.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "process_utils.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.process_utils").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.process_utils")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.helpers")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.module_loading").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.module_loading")
+        )
+
+        (
+            # Fix BaseOperatorLinks imports
+            self.qry.select_module("airflow.models").
+            is_filename(include=r"bigquery\.py|mlengine\.py").
+            filter(callback=google_package_filter).
+            filter(pure_airflow_models_filter).
+            rename("airflow.models.baseoperator")
+        )
+        self.remove_class("GKEStartPodOperator")
+        (
+            self.qry.
+            select_class("GKEStartPodOperator").
+            filter(callback=google_package_filter).
+            is_filename(include=r"example_kubernetes_engine\.py").
+            rename("GKEPodOperator")
+        )
+
+    def refactor_odbc_package(self):
+        """
+        Fixes to "odbc" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy helpers.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.odbc.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+
+        """
+        # noinspection PyUnusedLocal
+        def odbc_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/odbc/")
+
+        os.makedirs(os.path.join(get_target_providers_folder(), "odbc", "utils"), exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "helpers.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=odbc_package_filter).
+            rename("airflow.providers.odbc.utils.helpers")
+        )
+
+    def refactor_papermill_package(self):
+        """
+        Fixes to "papermill" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy lineage.py and it's __init__.py and we change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/papermill/example_dags/example_papermill.py
+            +++ ./airflow/providers/papermill/example_dags/example_papermill.py
+            @@ -26,8 +26,8 @@
+             import scrapbook as sb
+
+             from airflow import DAG
+            -from airflow.lineage import AUTO
+            -from airflow.operators.python import PythonOperator
+            +from airflow.providers.papermill.utils.lineage import AUTO
+            +from airflow.operators.python_operator import PythonOperator
+             from airflow.providers.papermill.operators.papermill import PapermillOperator
+             from airflow.utils.dates import days_ago
+             from airflow.version import version
+
+
+        Note also that copied lineage __init__.py needs to be refactored as well because it uses
+        Operator class (which is not existing in Airflow 1.10.*. We have a base operator template
+        prepared that imports the BaseOperator as an Operator and copy it as "base.py" in the
+        papermill.utils package (from template_base_operator.py) and we rename import to use it from there:
+
+        .. code-block:: diff
+
+            +++ ./airflow/providers/papermill/utils/lineage/__init__.py
+            @@ -27,7 +27,7 @@
+             import jinja2
+             from cattr import structure, unstructure
+
+            -from airflow.models.base import Operator
+            +from airflow.providers.papermill.utils.base import Operator
+             from airflow.utils.module_loading import import_string
+
+             ENV = jinja2.Environment()
+
+        """
+        # noinspection PyUnusedLocal
+        def papermill_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/papermill/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "lineage", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "lineage", "entities.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage", "entities.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "backport_packages", "template_base_operator.py.txt"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "base.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.lineage.entities").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.lineage.entities")
+        )
+        (
+            self.qry.
+            select_module("airflow.lineage").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.lineage")
+        )
+        # Papermill uses lineage which uses Operator under the hood so we need to change it as well
+        (
+            self.qry.
+            select_module("airflow.models.base").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.base")
+        )
+
+    def do_refactor(self, in_process: bool = False) -> None:
+        self.rename_deprecated_modules()
+        self.refactor_amazon_package()
+        self.refactor_google_package()
+        self.refactor_odbc_package()
+        self.refactor_papermill_package()
+        self.remove_tags()
+        self.remove_super_init_call()
+        self.add_provide_context_to_python_operators()
+        self.remove_poke_mode_only_decorator()
+        # In order to debug Bowler - set in_process to True
+        self.qry.execute(write=True, silent=False, interactive=False, in_process=in_process)

Review comment:
       Looks nice and this is definitely more readable 👌




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633195777


   @basph @kaxil @ashb  as discussed -> I've added automated import for all classes in providers in 1.10. Few fixes were neeed but otherwise it looks good :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429954852



##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")
+        sys.exit(1)

Review comment:
       Why? What's the reasoning?  This is just a plain script to run with "python script.py". I just wonder what's the benefit of using  sys.stdout instead ?  
   

##########
File path: airflow/providers/apache/hive/operators/hive.py
##########
@@ -95,8 +95,11 @@ def __init__(
         self.mapred_queue = mapred_queue
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
-        self.mapred_job_name_template = conf.get('hive',
-                                                 'mapred_job_name_template')
+        self.mapred_job_name_template = conf.get(
+            'hive', 'mapred_job_name_template', fallback='')
+        if self.mapred_job_name_template == '':
+            self.mapred_job_name_template = "Airflow HiveOperator task for " \
+                                            "{hostname}.{dag_id}.{task_id}.{execution_date}"

Review comment:
       OK. I think I've spent far too much time on this one just because it is screwed up. Yeah. I added ~ in the default value and it seems to work fine with fallback now in all combinations. The problem is that it works differently in tests and when you simply import the operator - depending on the configuration of this variable, the imports works or not ... :shrug:  
   

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,750 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +

Review comment:
       I have a feeling that we discussed it several times already. Glad I am really patient :)
   
   In airflow 1.10 none of the Hooks call super().init(). It was always broken in Airflow 1.10 - the BaseHook() has it's own __init__() which is wrongly implemented and requires source parameter to be passed::
   
   ```
       def __init__(self, source):
           pass
   ```
   
   We fixed it in 2.0, but for the entire 1.10 line calling super().init() is not a good idea - and it basically does nothing even if you do. And it's bad because it does not initialize Logging Mixin (BaseHook derives from LoggingMixin). And it is the main reason why Hook logs are not working as they are supposed to sometimes:
   
   ```
   class LoggingMixin(object):
       """
       Convenience super-class to have a logger configured with the class name
       """
       def __init__(self, context=None):
           self._set_context(context)
   ```
   
   There are two Hooks in 1.10 that call super.__init__ :
   
   ```
          super(CloudSqlDatabaseHook, self).__init__(source=None)
          super(MongoHook, self).__init__(source='mongo')
   ```
   
   Not that it helps with anything because init in BaseHook does nothing.
   

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,750 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +

Review comment:
       BTW. I put it as a comment in the fixup I am sending shortly. I hope that will be the last time I am explaining it :)

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):
+        # AUTO inlets do not work with Airflow 1.* series.
+        check_output = PythonOperator(
+            task_id='check_out',
+            python_callable=check_notebook,
+            input_nb="/tmp/out-{{ execution_date }}.ipynb"

Review comment:
       Yes. Apparently it was a bug in the example_papermill before that I copied it from, Changed it to PapermilllOperator.

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       It won't fail. Lineage is copied to "papermill" provider and Bowler refactors it to import it from there. It imports nicely.

##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")

Review comment:
       No problem. Easy fix. I think I have to be a bit more picky with my reviews. Seems our standards for perfectionism are growing.

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       Sure I will remove this lineage import and fallback this way. That's even easier for me

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition

Review comment:
       I just double checked it - it was there before done by Tomek, but I see that I only get one node type and no 300 nor 311. Also this operator now is restricted to single package by filter, so risk is very small that something wrong will be removed (and we will notice anyway at package preparation phase and in imports/readme generation.  If something gets removed what should not be the safety net we have now with importing all classes and generating README files will show us immediately that something was wrong. 
   
   Removed those nodes.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").

Review comment:
       Actually. Yeah. I am not sure at all why it was limited in the first place (I have not noticed that). I removed the limitations and I will see whether we have any problems. I do not expect to. Good catch

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       Actually the problem is that Papermilll is all about lineage, It imports NoteBook and File from lineage package and that's why I implemented those workarounds. But yes. Let's simply remove it altogether. It makes no sense with all those hacks.
   

##########
File path: backport_packages/template_base_operator.py.txt
##########
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.models.baseoperator import BaseOperator as Operator

Review comment:
       IT was needed because of both renam and package import at the same time + it was easier this way. But I will remove papermill altogether now indeed 

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks.
+
+        In airflow 1.10 almost none of the Hooks call super().init(). It was always broken in Airflow 1.10 -
+        the BaseHook() has it's own __init__() which is wrongly implemented and requires source
+        parameter to be passed::
+
+        .. code-block:: python
+
+            def __init__(self, source):
+                pass
+
+        We fixed it in 2.0, but for the entire 1.10 line calling super().init() is not a good idea -
+        and it basically does nothing even if you do. And it's bad because it does not initialize
+        LoggingMixin (BaseHook derives from LoggingMixin). And it is the main reason why Hook
+        logs are not working as they are supposed to sometimes:
+
+        .. code-block:: python
+
+            class LoggingMixin(object):
+                \"\"\"
+                Convenience super-class to have a logger configured with the class name
+                \"\"\"
+                def __init__(self, context=None):
+                    self._set_context(context)
+
+
+        There are two Hooks in 1.10 that call super.__init__ :
+
+        .. code-block:: python
+
+               super(CloudSqlDatabaseHook, self).__init__(source=None)
+               super(MongoHook, self).__init__(source='mongo')
+
+        Not that it helps with anything because init in BaseHook does nothing. So we remove
+        the super().init() in Hooks when backporting to 1.10.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +
+                     self.druid_ingest_conn_id = druid_ingest_conn_id
+                     self.timeout = timeout
+                     self.max_ingestion_time = max_ingestion_time
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_super_init_call_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for ch in node.post_order():
+                if isinstance(ch, Leaf) and ch.value == "super":
+                    if any(c.value for c in ch.parent.post_order() if isinstance(c, Leaf)):
+                        ch.parent.remove()
+
+        self.qry.select_subclass("BaseHook").modify(remove_super_init_call_modifier)
+
+    def remove_tags(self):
+        """
+        Removes tags from execution of the operators (in example_dags). Note that those changes
+        apply to example DAGs not to the operators/hooks erc. We package the example DAGs together
+        with the provider classes and they should serve as examples independently on the version
+        of Airflow it will be installed in. The tags are feature added in 1.10.10 and occasionally
+        we will want to run example DAGs as system tests in pre-1.10.10 version so we want to
+        remove the tags here.
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+
+            -- ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            @@ -83,8 +83,7 @@
+             with models.DAG(
+                 "example_datasync_2",
+                 default_args=default_args,
+            -    schedule_interval=None,  # Override to match your needs
+            -    tags=['example'],
+            +    schedule_interval=None,
+             ) as dag:
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_tags_modifier(_: LN, capture: Capture, filename: Filename) -> None:
+            for node in capture['function_arguments'][0].post_order():
+                if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
+                    if node.parent.next_sibling and node.parent.next_sibling.value == ",":
+                        node.parent.next_sibling.remove()
+                    node.parent.remove()
+
+        # Remove tags
+        self.qry.select_method("DAG").is_call().modify(remove_tags_modifier)
+
+    def remove_poke_mode_only_decorator(self):
+        """
+        Removes @poke_mode_only decorator. The decorator is only available in Airflow 2.0.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/sensors/gcs.py
+            +++ ./airflow/providers/google/cloud/sensors/gcs.py
+            @@ -189,7 +189,6 @@
+                 return datetime.now()
+
+
+            -@poke_mode_only
+             class GCSUploadSessionCompleteSensor(BaseSensorOperator):
+                 \"\"\"
+                Checks for changes in the number of objects at prefix in Google Cloud Storage
+
+        """
+        def find_and_remove_poke_mode_only_import(node: LN):
+            for child in node.children:
+                if isinstance(child, Leaf) and child.type == 1 and child.value == 'poke_mode_only':
+                    import_node = child.parent
+                    # remove the import by default
+                    skip_import_remove = False
+                    if isinstance(child.prev_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # remove coma before the whole import
+                        child.prev_sibling.remove()
+                        # do not remove if there are other imports
+                        skip_import_remove = True
+                    if isinstance(child.next_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # but keep the one after and do not remove the whole import
+                        skip_import_remove = True
+                    # remove the import
+                    child.remove()
+                    if not skip_import_remove:
+                        # remove import of there were no sibling
+                        import_node.remove()
+                else:
+                    find_and_remove_poke_mode_only_import(child)
+
+        def find_root_remove_import(node: LN):
+            current_node = node
+            while current_node.parent:
+                current_node = current_node.parent
+            find_and_remove_poke_mode_only_import(current_node)
+
+        def is_poke_mode_only_decorator(node: LN) -> bool:
+            return node.children and len(node.children) >= 2 and \
+                isinstance(node.children[0], Leaf) and node.children[0].value == '@' and \
+                isinstance(node.children[1], Leaf) and node.children[1].value == 'poke_mode_only'
+
+        # noinspection PyUnusedLocal
+        def remove_poke_mode_only_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for child in capture['node'].parent.children:
+                if is_poke_mode_only_decorator(child):
+                    find_root_remove_import(child)
+                    child.remove()
+
+        self.qry.select_subclass("BaseSensorOperator").modify(remove_poke_mode_only_modifier)
+
+    def refactor_amazon_package(self):
+        """
+        Fixes to "amazon" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy typing_compat.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/operators/ecs.py
+            +++ ./airflow/providers/amazon/aws/operators/ecs.py
+            @@ -24,7 +24,7 @@
+             from airflow.models import BaseOperator
+             from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+             from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            -from airflow.typing_compat import Protocol, runtime_checkable
+            +from airflow.providers.amazon.common.utils.typing_compat import Protocol, runtime_checkable
+             from airflow.utils.decorators import apply_defaults
+
+        """
+
+        # noinspection PyUnusedLocal
+        def amazon_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/amazon/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("amazon"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "typing_compat.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "typing_compat.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.typing_compat").
+            filter(callback=amazon_package_filter).
+            rename("airflow.providers.amazon.common.utils.typing_compat")
+        )
+
+    def refactor_google_package(self):
+        """
+        Fixes to "google" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there. Note that in this case we also rename
+        the imports in the copied files.
+
+        For example we copy python_virtualenv.py, process_utils.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -28,11 +28,11 @@
+
+             from airflow.exceptions import AirflowException
+             from airflow.models import BaseOperator
+            -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+            +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+             from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+             from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+             from airflow.utils.decorators import apply_defaults
+            -from airflow.utils.process_utils import execute_in_subprocess, patch_environ
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+        And in the copied python_virtualenv.py we also change import to process_utils.py. This happens
+        automatically and is solved by Pybowler.
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/python_virtualenv.py
+            +++ ./airflow/providers/google/common/utils/python_virtualenv.py
+            @@ -21,7 +21,7 @@
+             \"\"\"
+            from typing import List, Optional
+
+            -from airflow.utils.process_utils import execute_in_subprocess
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+            def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool)
+
+
+        We also rename Base operator links to deprecated names:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/mlengine.py
+            +++ ./airflow/providers/google/cloud/operators/mlengine.py
+            @@ -24,7 +24,7 @@
+             from typing import List, Optional
+
+             from airflow.exceptions import AirflowException
+            -from airflow.models import BaseOperator, BaseOperatorLink
+            +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+             from airflow.models.taskinstance import TaskInstance
+             from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
+             from airflow.utils.decorators import apply_defaults
+
+
+        We remove GKEStartPodOperator (example in remove_class method)
+
+
+        We also copy (google.common.utils) and rename imports to the helpers.
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.google.common.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+        And also module_loading  which is used by helpers
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/helpers.py
+            +++ ./airflow/providers/google/common/utils/helpers.py
+            @@ -26,7 +26,7 @@
+             from jinja2 import Template
+
+             from airflow.exceptions import AirflowException
+            -from airflow.utils.module_loading import import_string
+            +from airflow.providers.google.common.utils.module_loading import import_string
+
+             KEY_REGEX = re.compile(r'^[\\w.-]+$')
+
+        """
+        # noinspection PyUnusedLocal
+        def google_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/google/")
+
+        # noinspection PyUnusedLocal
+        def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            """Check if select is exactly [airflow, . , models]"""
+            return len([ch for ch in node.children[1].leaves()]) == 3
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("google"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "python_virtualenv.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "python_virtualenv.py")
+        )
+
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("google"), "common", "utils", "helpers.py"))
+
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "module_loading.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "module_loading.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.python_virtualenv").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.python_virtualenv")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "process_utils.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "process_utils.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.process_utils").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.process_utils")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.helpers")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.module_loading").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.module_loading")
+        )
+
+        (
+            # Fix BaseOperatorLinks imports
+            self.qry.select_module("airflow.models").
+            is_filename(include=r"bigquery\.py|mlengine\.py").
+            filter(callback=google_package_filter).
+            filter(pure_airflow_models_filter).
+            rename("airflow.models.baseoperator")
+        )
+        self.remove_class("GKEStartPodOperator")
+        (
+            self.qry.
+            select_class("GKEStartPodOperator").
+            filter(callback=google_package_filter).
+            is_filename(include=r"example_kubernetes_engine\.py").
+            rename("GKEPodOperator")
+        )
+
+    def refactor_odbc_package(self):
+        """
+        Fixes to "odbc" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy helpers.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.odbc.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+
+        """
+        # noinspection PyUnusedLocal
+        def odbc_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/odbc/")
+
+        os.makedirs(os.path.join(get_target_providers_folder(), "odbc", "utils"), exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "__init__.py")
+        )
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("odbc"), "utils", "helpers.py"))
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=odbc_package_filter).
+            rename("airflow.providers.odbc.utils.helpers")
+        )
+
+    def refactor_papermill_package(self):
+        """
+        Fixes to "papermill" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy lineage.py and it's __init__.py and we change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/papermill/example_dags/example_papermill.py
+            +++ ./airflow/providers/papermill/example_dags/example_papermill.py
+            @@ -26,8 +26,8 @@
+             import scrapbook as sb
+
+             from airflow import DAG
+            -from airflow.lineage import AUTO
+            -from airflow.operators.python import PythonOperator
+            +from airflow.providers.papermill.utils.lineage import AUTO
+            +from airflow.operators.python_operator import PythonOperator
+             from airflow.providers.papermill.operators.papermill import PapermillOperator
+             from airflow.utils.dates import days_ago
+             from airflow.version import version
+
+
+        Note also that copied lineage __init__.py needs to be refactored as well because it uses
+        Operator class (which is not existing in Airflow 1.10.*. We have a base operator template
+        prepared that imports the BaseOperator as an Operator and copy it as "base.py" in the
+        papermill.utils package (from template_base_operator.py) and we rename import to use it from there:

Review comment:
       Yes. I will exclude it. It is in fact very problematic and won't work with lineage at all anyway,




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #8991:
URL: https://github.com/apache/airflow/pull/8991


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429928365



##########
File path: setup.py
##########
@@ -506,7 +506,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     "facebook": facebook,
     "ftp": [],
     "google": google,
-    "grpc": grpc,
+    "grpc": grpc + google,

Review comment:
       Because this hook uses google https://github.com/apache/airflow/blob/master/airflow/providers/grpc/hooks/grpc.py
   
   ```
   from google import auth as google_auth
   from google.auth import jwt as google_auth_jwt
   from google.auth.transport import (
       grpc as google_auth_transport_grpc, requests as google_auth_transport_requests,
   )
   ```
   
   Surely we can add just one ore two libraries instead to grpc - google-auth related ones.
   
   Possibly those two will be enough? WDYT?
   
   ```
   'google-auth>=1.0.0, <2.0.0dev',
   'google-auth-httplib2>=0.0.1',
   ```
   
     




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429922628



##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,703 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators (in example dags).
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +
+                     self.druid_ingest_conn_id = druid_ingest_conn_id
+                     self.timeout = timeout
+                     self.max_ingestion_time = max_ingestion_time
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_super_init_call_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for ch in node.post_order():
+                if isinstance(ch, Leaf) and ch.value == "super":
+                    if any(c.value for c in ch.parent.post_order() if isinstance(c, Leaf)):
+                        ch.parent.remove()
+
+        self.qry.select_subclass("BaseHook").modify(remove_super_init_call_modifier)
+
+    def remove_tags(self):
+        """
+        Removes tags from execution of the operators (in example_dags). Example diff generated:
+
+        .. code-block:: diff
+
+
+            -- ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            @@ -83,8 +83,7 @@
+             with models.DAG(
+                 "example_datasync_2",
+                 default_args=default_args,
+            -    schedule_interval=None,  # Override to match your needs
+            -    tags=['example'],
+            +    schedule_interval=None,
+             ) as dag:
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_tags_modifier(_: LN, capture: Capture, filename: Filename) -> None:
+            for node in capture['function_arguments'][0].post_order():
+                if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
+                    if node.parent.next_sibling and node.parent.next_sibling.value == ",":
+                        node.parent.next_sibling.remove()
+                    node.parent.remove()
+
+        # Remove tags
+        self.qry.select_method("DAG").is_call().modify(remove_tags_modifier)
+
+    def remove_poke_mode_only_decorator(self):
+        """
+        Removes @poke_mode_only decorator. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/sensors/gcs.py
+            +++ ./airflow/providers/google/cloud/sensors/gcs.py
+            @@ -189,7 +189,6 @@
+                 return datetime.now()
+
+
+            -@poke_mode_only
+             class GCSUploadSessionCompleteSensor(BaseSensorOperator):
+                 \"\"\"
+                Checks for changes in the number of objects at prefix in Google Cloud Storage
+
+        """
+        def find_and_remove_poke_mode_only_import(node: LN):
+            for child in node.children:
+                if isinstance(child, Leaf) and child.type == 1 and child.value == 'poke_mode_only':
+                    import_node = child.parent
+                    # remove the import by default
+                    skip_import_remove = False
+                    if isinstance(child.prev_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # remove coma before the whole import
+                        child.prev_sibling.remove()
+                        # do not remove if there are other imports
+                        skip_import_remove = True
+                    if isinstance(child.next_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # but keep the one after and do not remove the whole import
+                        skip_import_remove = True
+                    # remove the import
+                    child.remove()
+                    if not skip_import_remove:
+                        # remove import of there were no sibling
+                        import_node.remove()
+                else:
+                    find_and_remove_poke_mode_only_import(child)
+
+        def find_root_remove_import(node: LN):
+            current_node = node
+            while current_node.parent:
+                current_node = current_node.parent
+            find_and_remove_poke_mode_only_import(current_node)
+
+        def is_poke_mode_only_decorator(node: LN) -> bool:
+            return node.children and len(node.children) >= 2 and \
+                isinstance(node.children[0], Leaf) and node.children[0].value == '@' and \
+                isinstance(node.children[1], Leaf) and node.children[1].value == 'poke_mode_only'
+
+        # noinspection PyUnusedLocal
+        def remove_poke_mode_only_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for child in capture['node'].parent.children:
+                if is_poke_mode_only_decorator(child):
+                    find_root_remove_import(child)
+                    child.remove()
+
+        self.qry.select_subclass("BaseSensorOperator").modify(remove_poke_mode_only_modifier)
+
+    def refactor_amazon_package(self):
+        """
+        Fixes to "amazon" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy typing_compat.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/operators/ecs.py
+            +++ ./airflow/providers/amazon/aws/operators/ecs.py
+            @@ -24,7 +24,7 @@
+             from airflow.models import BaseOperator
+             from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+             from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            -from airflow.typing_compat import Protocol, runtime_checkable
+            +from airflow.providers.amazon.common.utils.typing_compat import Protocol, runtime_checkable
+             from airflow.utils.decorators import apply_defaults
+
+        """
+
+        # noinspection PyUnusedLocal
+        def amazon_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/amazon/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("amazon"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "typing_compat.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "typing_compat.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.typing_compat").
+            filter(callback=amazon_package_filter).
+            rename("airflow.providers.amazon.common.utils.typing_compat")
+        )
+
+    def refactor_google_package(self):
+        """
+        Fixes to "google" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there. Note that in this case we also rename
+        the imports in the copied files.
+
+        For example we copy python_virtualenv.py, process_utils.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -28,11 +28,11 @@
+
+             from airflow.exceptions import AirflowException
+             from airflow.models import BaseOperator
+            -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+            +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+             from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+             from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+             from airflow.utils.decorators import apply_defaults
+            -from airflow.utils.process_utils import execute_in_subprocess, patch_environ
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+        And in the copied python_virtualenv.py we also change import to process_utils.py. This happens
+        automatically and is solved by Pybowler.
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/python_virtualenv.py
+            +++ ./airflow/providers/google/common/utils/python_virtualenv.py
+            @@ -21,7 +21,7 @@
+             \"\"\"
+            from typing import List, Optional
+
+            -from airflow.utils.process_utils import execute_in_subprocess
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+            def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool)
+
+
+        We also rename Base operator links to deprecated names:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/mlengine.py
+            +++ ./airflow/providers/google/cloud/operators/mlengine.py
+            @@ -24,7 +24,7 @@
+             from typing import List, Optional
+
+             from airflow.exceptions import AirflowException
+            -from airflow.models import BaseOperator, BaseOperatorLink
+            +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+             from airflow.models.taskinstance import TaskInstance
+             from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
+             from airflow.utils.decorators import apply_defaults
+
+
+        We remove GKEStartPodOperator (example in remove_class method)
+
+
+        We also copy (google.common.utils) and rename imports to the helpers.
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.google.common.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+        And also module_loading  which is used by helpers
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/helpers.py
+            +++ ./airflow/providers/google/common/utils/helpers.py
+            @@ -26,7 +26,7 @@
+             from jinja2 import Template
+
+             from airflow.exceptions import AirflowException
+            -from airflow.utils.module_loading import import_string
+            +from airflow.providers.google.common.utils.module_loading import import_string
+
+             KEY_REGEX = re.compile(r'^[\\w.-]+$')
+
+        """
+        # noinspection PyUnusedLocal
+        def google_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/google/")
+
+        # noinspection PyUnusedLocal
+        def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            """Check if select is exactly [airflow, . , models]"""
+            return len([ch for ch in node.children[1].leaves()]) == 3
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("google"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "python_virtualenv.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "python_virtualenv.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "helpers.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "module_loading.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "module_loading.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.python_virtualenv").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.python_virtualenv")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "process_utils.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "process_utils.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.process_utils").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.process_utils")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.helpers")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.module_loading").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.module_loading")
+        )
+
+        (
+            # Fix BaseOperatorLinks imports
+            self.qry.select_module("airflow.models").
+            is_filename(include=r"bigquery\.py|mlengine\.py").
+            filter(callback=google_package_filter).
+            filter(pure_airflow_models_filter).
+            rename("airflow.models.baseoperator")
+        )
+        self.remove_class("GKEStartPodOperator")
+        (
+            self.qry.
+            select_class("GKEStartPodOperator").
+            filter(callback=google_package_filter).
+            is_filename(include=r"example_kubernetes_engine\.py").
+            rename("GKEPodOperator")
+        )
+
+    def refactor_odbc_package(self):
+        """
+        Fixes to "odbc" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy helpers.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.odbc.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+
+        """
+        # noinspection PyUnusedLocal
+        def odbc_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/odbc/")
+
+        os.makedirs(os.path.join(get_target_providers_folder(), "odbc", "utils"), exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "helpers.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=odbc_package_filter).
+            rename("airflow.providers.odbc.utils.helpers")
+        )
+
+    def refactor_papermill_package(self):
+        """
+        Fixes to "papermill" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy lineage.py and it's __init__.py and we change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/papermill/example_dags/example_papermill.py
+            +++ ./airflow/providers/papermill/example_dags/example_papermill.py
+            @@ -26,8 +26,8 @@
+             import scrapbook as sb
+
+             from airflow import DAG
+            -from airflow.lineage import AUTO
+            -from airflow.operators.python import PythonOperator
+            +from airflow.providers.papermill.utils.lineage import AUTO
+            +from airflow.operators.python_operator import PythonOperator
+             from airflow.providers.papermill.operators.papermill import PapermillOperator
+             from airflow.utils.dates import days_ago
+             from airflow.version import version
+
+
+        Note also that copied lineage __init__.py needs to be refactored as well because it uses
+        Operator class (which is not existing in Airflow 1.10.*. We have a base operator template
+        prepared that imports the BaseOperator as an Operator and copy it as "base.py" in the
+        papermill.utils package (from template_base_operator.py) and we rename import to use it from there:
+
+        .. code-block:: diff
+
+            +++ ./airflow/providers/papermill/utils/lineage/__init__.py
+            @@ -27,7 +27,7 @@
+             import jinja2
+             from cattr import structure, unstructure
+
+            -from airflow.models.base import Operator
+            +from airflow.providers.papermill.utils.base import Operator
+             from airflow.utils.module_loading import import_string
+
+             ENV = jinja2.Environment()
+
+        """
+        # noinspection PyUnusedLocal
+        def papermill_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/papermill/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "lineage", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "lineage", "entities.py"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "lineage", "entities.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "backport_packages", "template_base_operator.py.txt"),
+            os.path.join(get_target_providers_package_folder("papermill"), "utils", "base.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.lineage.entities").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.lineage.entities")
+        )
+        (
+            self.qry.
+            select_module("airflow.lineage").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.lineage")
+        )
+        # Papermill uses lineage which uses Operator under the hood so we need to change it as well
+        (
+            self.qry.
+            select_module("airflow.models.base").
+            filter(callback=papermill_package_filter).
+            rename("airflow.providers.papermill.utils.base")
+        )
+
+    def do_refactor(self, in_process: bool = False) -> None:
+        self.rename_deprecated_modules()
+        self.refactor_amazon_package()
+        self.refactor_google_package()
+        self.refactor_odbc_package()
+        self.refactor_papermill_package()
+        self.remove_tags()
+        self.remove_super_init_call()
+        self.add_provide_context_to_python_operators()
+        self.remove_poke_mode_only_decorator()
+        # In order to debug Bowler - set in_process to True
+        self.qry.execute(write=True, silent=False, interactive=False, in_process=in_process)

Review comment:
       Cool. I think I fixed last failure. With this small thing on how to debug Bowler and having it as a switch on python execution, I think it is super important. I committed it a few times incidentally before and I wondered why it runs so slow :).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429937917



##########
File path: requirements/requirements-python3.7.txt
##########
@@ -115,7 +115,6 @@ elasticsearch-dsl==7.2.0
 elasticsearch==7.5.1
 email-validator==1.1.1
 entrypoints==0.3
-enum34==1.1.10

Review comment:
       Same here - I do no know, but I also do not care now - since all the generation is done automated and we have tests confirming that this set of requirements work fine.

##########
File path: requirements/requirements-python3.7.txt
##########
@@ -115,7 +115,6 @@ elasticsearch-dsl==7.2.0
 elasticsearch==7.5.1
 email-validator==1.1.1
 entrypoints==0.3
-enum34==1.1.10

Review comment:
       Same here - I do not know, but I also do not care now - since all the generation is done automated and we have tests confirming that this set of requirements work fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BasPH commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
BasPH commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429614907



##########
File path: airflow/providers/odbc/hooks/odbc.py
##########
@@ -24,7 +24,23 @@
 import pyodbc
 
 from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.helpers import merge_dicts
+
+
+# We do not import it from airflow.utils because merge_dicts is not available in Airflow 1.10
+# So this operator would not be backportable
+def merge_dicts(dict1, dict2):

Review comment:
       Ah I see, read it too quick.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429612332



##########
File path: airflow/providers/amazon/aws/hooks/batch_client.py
##########
@@ -36,7 +36,15 @@
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.typing_compat import Protocol, runtime_checkable

Review comment:
       Not a huge fan of this.
   
   Could we do this in the released versions only with bowler?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429929467



##########
File path: setup.py
##########
@@ -506,7 +506,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     "facebook": facebook,
     "ftp": [],
     "google": google,
-    "grpc": grpc,
+    "grpc": grpc + google,

Review comment:
       I push it in a moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429931884



##########
File path: scripts/ci/in_container/kubernetes/app/templates/configmaps.template.yaml
##########
@@ -204,6 +204,7 @@ data:
     [hive]
     # Default mapreduce queue for HiveOperator tasks
     default_hive_mapred_queue =
+    mapred_job_name_template =

Review comment:
       Sure. I can catch the exception instead. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r430061049



##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):
+        # AUTO inlets do not work with Airflow 1.* series.
+        check_output = PythonOperator(
+            task_id='check_out',
+            python_callable=check_notebook,
+            input_nb="/tmp/out-{{ execution_date }}.ipynb"

Review comment:
       Python operator doesn't have an input_nb arg.

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       Rather than a version check I'd prefer we used "duck-typing" - i.e
   
   ```python
   try:
       from airflow.lineage import AUTO
   except ImportError:
       AUTO = None
   
   
   ...
   
   if AUTO is not None:
       ...
   ```
   
   Also: we are still doing `from airflow.lineage import AUTO` which will fail on 1.10, so this check is pointless as it stands :)

##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")
+        sys.exit(1)

Review comment:
       Argh, Bad comment I left in a rush.
   
   These should be to _sys.stderr_ I meant to say

##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")

Review comment:
       ```suggestion
           print(file=sys.stderr)
           print("ERROR: There were some import errors", file=sys.stderr)
           print(file=sys.stderr)
           for trace in tracebacks:
               print("----------------------------------------", file=sys.stderr)
               print(trace, file=sys.stderr)
               print("----------------------------------------", file=sys.stderr)
   ```
   
   etc.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition

Review comment:
       Minor point: does Bowler expose constants for these? Or if not, can you at add a comment saying what these two types are.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv

Review comment:
       Bash and Dummy don't belong in this list

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv

Review comment:
       Oh, I see. Nope this is fine.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").

Review comment:
       Can this be broadeded to select any example dag instead of just this one specific one?

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,750 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +

Review comment:
       Ahhh thanks. I wasn't aware of this broken behaviour of BaseHook
   
   I know I ask many things more than once -- this one I don't remember though.

##########
File path: requirements/requirements-python3.7.txt
##########
@@ -115,7 +115,6 @@ elasticsearch-dsl==7.2.0
 elasticsearch==7.5.1
 email-validator==1.1.1
 entrypoints==0.3
-enum34==1.1.10

Review comment:
       Curious, this should have gone as soon as you merged #8648 (Apri 30, 2020), as this dep was removed in #5146 (Apr 22, 2019!)
   
   Anyway, yeah cool.

##########
File path: setup.py
##########
@@ -285,13 +285,15 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     'pandas-gbq',
 ]
 grpc = [
+    'google-auth>=1.0.0, <2.0.0dev',
+    'google-auth-httplib2>=0.0.1',

Review comment:
       Thanks.

##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")

Review comment:
       (If this was the only thing I'd say leave it, but there are a few others too, including the import problem in example papermil, so since we are making changes still)

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       We are still doing `from airflow.lineage import AUTO` which will fail on 1.10, so this check is pointless as it stands :)
   
   Rather than a version check I'd prefer we used "duck-typing" - i.e
   
   ```python
   try:
       from airflow.lineage import AUTO
   except ImportError:
       AUTO = None
   
   
   ...
   
   if AUTO is not None:
       ...
   ```

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       That seems odd? Lineage is part of core, isn't it? Does the lineage actually work like this? Nothing inside papermill_operator.py looks at lineage, so I don't see why we need to include it in the backport.
   
   Is this new in rc3/this PR, or can I see this behaviour in Rc2 too?
   
   This seems a higher risk approach then updating an example to remove lineage=AUTO to me.

##########
File path: backport_packages/template_base_operator.py.txt
##########
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.models.baseoperator import BaseOperator as Operator

Review comment:
       I don't get why this file is needed. papermill_operator `from airflow.models import BaseOperator` which has worked since 1.10.3.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks.
+
+        In airflow 1.10 almost none of the Hooks call super().init(). It was always broken in Airflow 1.10 -
+        the BaseHook() has it's own __init__() which is wrongly implemented and requires source
+        parameter to be passed::
+
+        .. code-block:: python
+
+            def __init__(self, source):
+                pass
+
+        We fixed it in 2.0, but for the entire 1.10 line calling super().init() is not a good idea -
+        and it basically does nothing even if you do. And it's bad because it does not initialize
+        LoggingMixin (BaseHook derives from LoggingMixin). And it is the main reason why Hook
+        logs are not working as they are supposed to sometimes:
+
+        .. code-block:: python
+
+            class LoggingMixin(object):
+                \"\"\"
+                Convenience super-class to have a logger configured with the class name
+                \"\"\"
+                def __init__(self, context=None):
+                    self._set_context(context)
+
+
+        There are two Hooks in 1.10 that call super.__init__ :
+
+        .. code-block:: python
+
+               super(CloudSqlDatabaseHook, self).__init__(source=None)
+               super(MongoHook, self).__init__(source='mongo')
+
+        Not that it helps with anything because init in BaseHook does nothing. So we remove
+        the super().init() in Hooks when backporting to 1.10.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +
+                     self.druid_ingest_conn_id = druid_ingest_conn_id
+                     self.timeout = timeout
+                     self.max_ingestion_time = max_ingestion_time
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_super_init_call_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for ch in node.post_order():
+                if isinstance(ch, Leaf) and ch.value == "super":
+                    if any(c.value for c in ch.parent.post_order() if isinstance(c, Leaf)):
+                        ch.parent.remove()
+
+        self.qry.select_subclass("BaseHook").modify(remove_super_init_call_modifier)
+
+    def remove_tags(self):
+        """
+        Removes tags from execution of the operators (in example_dags). Note that those changes
+        apply to example DAGs not to the operators/hooks erc. We package the example DAGs together
+        with the provider classes and they should serve as examples independently on the version
+        of Airflow it will be installed in. The tags are feature added in 1.10.10 and occasionally
+        we will want to run example DAGs as system tests in pre-1.10.10 version so we want to
+        remove the tags here.
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+
+            -- ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            @@ -83,8 +83,7 @@
+             with models.DAG(
+                 "example_datasync_2",
+                 default_args=default_args,
+            -    schedule_interval=None,  # Override to match your needs
+            -    tags=['example'],
+            +    schedule_interval=None,
+             ) as dag:
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_tags_modifier(_: LN, capture: Capture, filename: Filename) -> None:
+            for node in capture['function_arguments'][0].post_order():
+                if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
+                    if node.parent.next_sibling and node.parent.next_sibling.value == ",":
+                        node.parent.next_sibling.remove()
+                    node.parent.remove()
+
+        # Remove tags
+        self.qry.select_method("DAG").is_call().modify(remove_tags_modifier)
+
+    def remove_poke_mode_only_decorator(self):
+        """
+        Removes @poke_mode_only decorator. The decorator is only available in Airflow 2.0.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/sensors/gcs.py
+            +++ ./airflow/providers/google/cloud/sensors/gcs.py
+            @@ -189,7 +189,6 @@
+                 return datetime.now()
+
+
+            -@poke_mode_only
+             class GCSUploadSessionCompleteSensor(BaseSensorOperator):
+                 \"\"\"
+                Checks for changes in the number of objects at prefix in Google Cloud Storage
+
+        """
+        def find_and_remove_poke_mode_only_import(node: LN):
+            for child in node.children:
+                if isinstance(child, Leaf) and child.type == 1 and child.value == 'poke_mode_only':
+                    import_node = child.parent
+                    # remove the import by default
+                    skip_import_remove = False
+                    if isinstance(child.prev_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # remove coma before the whole import
+                        child.prev_sibling.remove()
+                        # do not remove if there are other imports
+                        skip_import_remove = True
+                    if isinstance(child.next_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # but keep the one after and do not remove the whole import
+                        skip_import_remove = True
+                    # remove the import
+                    child.remove()
+                    if not skip_import_remove:
+                        # remove import of there were no sibling
+                        import_node.remove()
+                else:
+                    find_and_remove_poke_mode_only_import(child)
+
+        def find_root_remove_import(node: LN):
+            current_node = node
+            while current_node.parent:
+                current_node = current_node.parent
+            find_and_remove_poke_mode_only_import(current_node)
+
+        def is_poke_mode_only_decorator(node: LN) -> bool:
+            return node.children and len(node.children) >= 2 and \
+                isinstance(node.children[0], Leaf) and node.children[0].value == '@' and \
+                isinstance(node.children[1], Leaf) and node.children[1].value == 'poke_mode_only'
+
+        # noinspection PyUnusedLocal
+        def remove_poke_mode_only_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for child in capture['node'].parent.children:
+                if is_poke_mode_only_decorator(child):
+                    find_root_remove_import(child)
+                    child.remove()
+
+        self.qry.select_subclass("BaseSensorOperator").modify(remove_poke_mode_only_modifier)
+
+    def refactor_amazon_package(self):
+        """
+        Fixes to "amazon" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy typing_compat.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/operators/ecs.py
+            +++ ./airflow/providers/amazon/aws/operators/ecs.py
+            @@ -24,7 +24,7 @@
+             from airflow.models import BaseOperator
+             from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+             from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            -from airflow.typing_compat import Protocol, runtime_checkable
+            +from airflow.providers.amazon.common.utils.typing_compat import Protocol, runtime_checkable
+             from airflow.utils.decorators import apply_defaults
+
+        """
+
+        # noinspection PyUnusedLocal
+        def amazon_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/amazon/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("amazon"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "typing_compat.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "typing_compat.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.typing_compat").
+            filter(callback=amazon_package_filter).
+            rename("airflow.providers.amazon.common.utils.typing_compat")
+        )
+
+    def refactor_google_package(self):
+        """
+        Fixes to "google" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there. Note that in this case we also rename
+        the imports in the copied files.
+
+        For example we copy python_virtualenv.py, process_utils.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -28,11 +28,11 @@
+
+             from airflow.exceptions import AirflowException
+             from airflow.models import BaseOperator
+            -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+            +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+             from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+             from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+             from airflow.utils.decorators import apply_defaults
+            -from airflow.utils.process_utils import execute_in_subprocess, patch_environ
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+        And in the copied python_virtualenv.py we also change import to process_utils.py. This happens
+        automatically and is solved by Pybowler.
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/python_virtualenv.py
+            +++ ./airflow/providers/google/common/utils/python_virtualenv.py
+            @@ -21,7 +21,7 @@
+             \"\"\"
+            from typing import List, Optional
+
+            -from airflow.utils.process_utils import execute_in_subprocess
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+            def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool)
+
+
+        We also rename Base operator links to deprecated names:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/mlengine.py
+            +++ ./airflow/providers/google/cloud/operators/mlengine.py
+            @@ -24,7 +24,7 @@
+             from typing import List, Optional
+
+             from airflow.exceptions import AirflowException
+            -from airflow.models import BaseOperator, BaseOperatorLink
+            +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+             from airflow.models.taskinstance import TaskInstance
+             from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
+             from airflow.utils.decorators import apply_defaults
+
+
+        We remove GKEStartPodOperator (example in remove_class method)
+
+
+        We also copy (google.common.utils) and rename imports to the helpers.
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.google.common.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+        And also module_loading  which is used by helpers
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/helpers.py
+            +++ ./airflow/providers/google/common/utils/helpers.py
+            @@ -26,7 +26,7 @@
+             from jinja2 import Template
+
+             from airflow.exceptions import AirflowException
+            -from airflow.utils.module_loading import import_string
+            +from airflow.providers.google.common.utils.module_loading import import_string
+
+             KEY_REGEX = re.compile(r'^[\\w.-]+$')
+
+        """
+        # noinspection PyUnusedLocal
+        def google_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/google/")
+
+        # noinspection PyUnusedLocal
+        def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            """Check if select is exactly [airflow, . , models]"""
+            return len([ch for ch in node.children[1].leaves()]) == 3
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("google"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "python_virtualenv.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "python_virtualenv.py")
+        )
+
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("google"), "common", "utils", "helpers.py"))
+
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "module_loading.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "module_loading.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.python_virtualenv").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.python_virtualenv")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "process_utils.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "process_utils.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.process_utils").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.process_utils")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.helpers")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.module_loading").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.module_loading")
+        )
+
+        (
+            # Fix BaseOperatorLinks imports
+            self.qry.select_module("airflow.models").
+            is_filename(include=r"bigquery\.py|mlengine\.py").
+            filter(callback=google_package_filter).
+            filter(pure_airflow_models_filter).
+            rename("airflow.models.baseoperator")
+        )
+        self.remove_class("GKEStartPodOperator")
+        (
+            self.qry.
+            select_class("GKEStartPodOperator").
+            filter(callback=google_package_filter).
+            is_filename(include=r"example_kubernetes_engine\.py").
+            rename("GKEPodOperator")
+        )
+
+    def refactor_odbc_package(self):
+        """
+        Fixes to "odbc" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy helpers.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.odbc.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+
+        """
+        # noinspection PyUnusedLocal
+        def odbc_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/odbc/")
+
+        os.makedirs(os.path.join(get_target_providers_folder(), "odbc", "utils"), exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "__init__.py")
+        )
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("odbc"), "utils", "helpers.py"))
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=odbc_package_filter).
+            rename("airflow.providers.odbc.utils.helpers")
+        )
+
+    def refactor_papermill_package(self):
+        """
+        Fixes to "papermill" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy lineage.py and it's __init__.py and we change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/papermill/example_dags/example_papermill.py
+            +++ ./airflow/providers/papermill/example_dags/example_papermill.py
+            @@ -26,8 +26,8 @@
+             import scrapbook as sb
+
+             from airflow import DAG
+            -from airflow.lineage import AUTO
+            -from airflow.operators.python import PythonOperator
+            +from airflow.providers.papermill.utils.lineage import AUTO
+            +from airflow.operators.python_operator import PythonOperator
+             from airflow.providers.papermill.operators.papermill import PapermillOperator
+             from airflow.utils.dates import days_ago
+             from airflow.version import version
+
+
+        Note also that copied lineage __init__.py needs to be refactored as well because it uses
+        Operator class (which is not existing in Airflow 1.10.*. We have a base operator template
+        prepared that imports the BaseOperator as an Operator and copy it as "base.py" in the
+        papermill.utils package (from template_base_operator.py) and we rename import to use it from there:

Review comment:
       This seems like a lot of work, and how well have we tested it actually works?
   
   Also given how small Papermill operator is and the problems it's causing/special cases it needs: should we not do a backport release of it.

##########
File path: airflow/providers/papermill/example_dags/example_papermill.py
##########
@@ -82,10 +83,18 @@ def check_notebook(inlets, execution_date):
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}
     )
 
-    check_output = PythonOperator(
-        task_id='check_out',
-        python_callable=check_notebook,
-        inlets=AUTO
-    )
+    if version.startswith("1."):

Review comment:
       That seems odd? Lineage is part of core, isn't it? Does the lineage actually work like this? Nothing inside papermill_operator.py looks at lineage.AUTO so I don't see why we need to include it in the backport.
   
   Is this new in rc3/this PR, or can I see this behaviour in Rc2 too?
   
   This seems a higher risk approach then updating an example to remove lineage=AUTO to me.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks.
+
+        In airflow 1.10 almost none of the Hooks call super().init(). It was always broken in Airflow 1.10 -
+        the BaseHook() has it's own __init__() which is wrongly implemented and requires source
+        parameter to be passed::
+
+        .. code-block:: python
+
+            def __init__(self, source):
+                pass
+
+        We fixed it in 2.0, but for the entire 1.10 line calling super().init() is not a good idea -
+        and it basically does nothing even if you do. And it's bad because it does not initialize
+        LoggingMixin (BaseHook derives from LoggingMixin). And it is the main reason why Hook
+        logs are not working as they are supposed to sometimes:
+
+        .. code-block:: python
+
+            class LoggingMixin(object):
+                \"\"\"
+                Convenience super-class to have a logger configured with the class name
+                \"\"\"
+                def __init__(self, context=None):
+                    self._set_context(context)
+
+
+        There are two Hooks in 1.10 that call super.__init__ :
+
+        .. code-block:: python
+
+               super(CloudSqlDatabaseHook, self).__init__(source=None)
+               super(MongoHook, self).__init__(source='mongo')
+
+        Not that it helps with anything because init in BaseHook does nothing. So we remove
+        the super().init() in Hooks when backporting to 1.10.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +
+                     self.druid_ingest_conn_id = druid_ingest_conn_id
+                     self.timeout = timeout
+                     self.max_ingestion_time = max_ingestion_time
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_super_init_call_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for ch in node.post_order():
+                if isinstance(ch, Leaf) and ch.value == "super":
+                    if any(c.value for c in ch.parent.post_order() if isinstance(c, Leaf)):
+                        ch.parent.remove()
+
+        self.qry.select_subclass("BaseHook").modify(remove_super_init_call_modifier)
+
+    def remove_tags(self):
+        """
+        Removes tags from execution of the operators (in example_dags). Note that those changes
+        apply to example DAGs not to the operators/hooks erc. We package the example DAGs together
+        with the provider classes and they should serve as examples independently on the version
+        of Airflow it will be installed in. The tags are feature added in 1.10.10 and occasionally
+        we will want to run example DAGs as system tests in pre-1.10.10 version so we want to
+        remove the tags here.
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+
+            -- ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+            @@ -83,8 +83,7 @@
+             with models.DAG(
+                 "example_datasync_2",
+                 default_args=default_args,
+            -    schedule_interval=None,  # Override to match your needs
+            -    tags=['example'],
+            +    schedule_interval=None,
+             ) as dag:
+
+        """
+        # noinspection PyUnusedLocal
+        def remove_tags_modifier(_: LN, capture: Capture, filename: Filename) -> None:
+            for node in capture['function_arguments'][0].post_order():
+                if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
+                    if node.parent.next_sibling and node.parent.next_sibling.value == ",":
+                        node.parent.next_sibling.remove()
+                    node.parent.remove()
+
+        # Remove tags
+        self.qry.select_method("DAG").is_call().modify(remove_tags_modifier)
+
+    def remove_poke_mode_only_decorator(self):
+        """
+        Removes @poke_mode_only decorator. The decorator is only available in Airflow 2.0.
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/sensors/gcs.py
+            +++ ./airflow/providers/google/cloud/sensors/gcs.py
+            @@ -189,7 +189,6 @@
+                 return datetime.now()
+
+
+            -@poke_mode_only
+             class GCSUploadSessionCompleteSensor(BaseSensorOperator):
+                 \"\"\"
+                Checks for changes in the number of objects at prefix in Google Cloud Storage
+
+        """
+        def find_and_remove_poke_mode_only_import(node: LN):
+            for child in node.children:
+                if isinstance(child, Leaf) and child.type == 1 and child.value == 'poke_mode_only':
+                    import_node = child.parent
+                    # remove the import by default
+                    skip_import_remove = False
+                    if isinstance(child.prev_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # remove coma before the whole import
+                        child.prev_sibling.remove()
+                        # do not remove if there are other imports
+                        skip_import_remove = True
+                    if isinstance(child.next_sibling, Leaf) and child.prev_sibling.value == ",":
+                        # but keep the one after and do not remove the whole import
+                        skip_import_remove = True
+                    # remove the import
+                    child.remove()
+                    if not skip_import_remove:
+                        # remove import of there were no sibling
+                        import_node.remove()
+                else:
+                    find_and_remove_poke_mode_only_import(child)
+
+        def find_root_remove_import(node: LN):
+            current_node = node
+            while current_node.parent:
+                current_node = current_node.parent
+            find_and_remove_poke_mode_only_import(current_node)
+
+        def is_poke_mode_only_decorator(node: LN) -> bool:
+            return node.children and len(node.children) >= 2 and \
+                isinstance(node.children[0], Leaf) and node.children[0].value == '@' and \
+                isinstance(node.children[1], Leaf) and node.children[1].value == 'poke_mode_only'
+
+        # noinspection PyUnusedLocal
+        def remove_poke_mode_only_modifier(node: LN, capture: Capture, filename: Filename) -> None:
+            for child in capture['node'].parent.children:
+                if is_poke_mode_only_decorator(child):
+                    find_root_remove_import(child)
+                    child.remove()
+
+        self.qry.select_subclass("BaseSensorOperator").modify(remove_poke_mode_only_modifier)
+
+    def refactor_amazon_package(self):
+        """
+        Fixes to "amazon" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy typing_compat.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/operators/ecs.py
+            +++ ./airflow/providers/amazon/aws/operators/ecs.py
+            @@ -24,7 +24,7 @@
+             from airflow.models import BaseOperator
+             from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+             from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+            -from airflow.typing_compat import Protocol, runtime_checkable
+            +from airflow.providers.amazon.common.utils.typing_compat import Protocol, runtime_checkable
+             from airflow.utils.decorators import apply_defaults
+
+        """
+
+        # noinspection PyUnusedLocal
+        def amazon_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/amazon/")
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("amazon"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "typing_compat.py"),
+            os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "typing_compat.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.typing_compat").
+            filter(callback=amazon_package_filter).
+            rename("airflow.providers.amazon.common.utils.typing_compat")
+        )
+
+    def refactor_google_package(self):
+        """
+        Fixes to "google" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there. Note that in this case we also rename
+        the imports in the copied files.
+
+        For example we copy python_virtualenv.py, process_utils.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -28,11 +28,11 @@
+
+             from airflow.exceptions import AirflowException
+             from airflow.models import BaseOperator
+            -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+            +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+             from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+             from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+             from airflow.utils.decorators import apply_defaults
+            -from airflow.utils.process_utils import execute_in_subprocess, patch_environ
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+        And in the copied python_virtualenv.py we also change import to process_utils.py. This happens
+        automatically and is solved by Pybowler.
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/python_virtualenv.py
+            +++ ./airflow/providers/google/common/utils/python_virtualenv.py
+            @@ -21,7 +21,7 @@
+             \"\"\"
+            from typing import List, Optional
+
+            -from airflow.utils.process_utils import execute_in_subprocess
+            +from airflow.providers.google.common.utils.process_utils import execute_in_subprocess
+
+
+            def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool)
+
+
+        We also rename Base operator links to deprecated names:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/mlengine.py
+            +++ ./airflow/providers/google/cloud/operators/mlengine.py
+            @@ -24,7 +24,7 @@
+             from typing import List, Optional
+
+             from airflow.exceptions import AirflowException
+            -from airflow.models import BaseOperator, BaseOperatorLink
+            +from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
+             from airflow.models.taskinstance import TaskInstance
+             from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
+             from airflow.utils.decorators import apply_defaults
+
+
+        We remove GKEStartPodOperator (example in remove_class method)
+
+
+        We also copy (google.common.utils) and rename imports to the helpers.
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.google.common.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+        And also module_loading  which is used by helpers
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/common/utils/helpers.py
+            +++ ./airflow/providers/google/common/utils/helpers.py
+            @@ -26,7 +26,7 @@
+             from jinja2 import Template
+
+             from airflow.exceptions import AirflowException
+            -from airflow.utils.module_loading import import_string
+            +from airflow.providers.google.common.utils.module_loading import import_string
+
+             KEY_REGEX = re.compile(r'^[\\w.-]+$')
+
+        """
+        # noinspection PyUnusedLocal
+        def google_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/google/")
+
+        # noinspection PyUnusedLocal
+        def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            """Check if select is exactly [airflow, . , models]"""
+            return len([ch for ch in node.children[1].leaves()]) == 3
+
+        os.makedirs(os.path.join(get_target_providers_package_folder("google"), "common", "utils"),
+                    exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "__init__.py")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "python_virtualenv.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "python_virtualenv.py")
+        )
+
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("google"), "common", "utils", "helpers.py"))
+
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "module_loading.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils",
+                         "module_loading.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.python_virtualenv").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.python_virtualenv")
+        )
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "process_utils.py"),
+            os.path.join(get_target_providers_package_folder("google"), "common", "utils", "process_utils.py")
+        )
+        (
+            self.qry.
+            select_module("airflow.utils.process_utils").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.process_utils")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.helpers")
+        )
+
+        (
+            self.qry.
+            select_module("airflow.utils.module_loading").
+            filter(callback=google_package_filter).
+            rename("airflow.providers.google.common.utils.module_loading")
+        )
+
+        (
+            # Fix BaseOperatorLinks imports
+            self.qry.select_module("airflow.models").
+            is_filename(include=r"bigquery\.py|mlengine\.py").
+            filter(callback=google_package_filter).
+            filter(pure_airflow_models_filter).
+            rename("airflow.models.baseoperator")
+        )
+        self.remove_class("GKEStartPodOperator")
+        (
+            self.qry.
+            select_class("GKEStartPodOperator").
+            filter(callback=google_package_filter).
+            is_filename(include=r"example_kubernetes_engine\.py").
+            rename("GKEPodOperator")
+        )
+
+    def refactor_odbc_package(self):
+        """
+        Fixes to "odbc" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy helpers.py and change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            +++ ./airflow/providers/google/cloud/example_dags/example_datacatalog.py
+            @@ -37,7 +37,7 @@
+                 CloudDataCatalogUpdateTagTemplateOperator,
+             )
+             from airflow.utils.dates import days_ago
+            -from airflow.utils.helpers import chain
+            +from airflow.providers.odbc.utils.helpers import chain
+
+             default_args = {"start_date": days_ago(1)}
+
+
+        """
+        # noinspection PyUnusedLocal
+        def odbc_package_filter(node: LN, capture: Capture, filename: Filename) -> bool:
+            return filename.startswith("./airflow/providers/odbc/")
+
+        os.makedirs(os.path.join(get_target_providers_folder(), "odbc", "utils"), exist_ok=True)
+        copyfile(
+            os.path.join(get_source_airflow_folder(), "airflow", "utils", "__init__.py"),
+            os.path.join(get_target_providers_package_folder("odbc"), "utils", "__init__.py")
+        )
+        copy_helper_py_file(os.path.join(
+            get_target_providers_package_folder("odbc"), "utils", "helpers.py"))
+
+        (
+            self.qry.
+            select_module("airflow.utils.helpers").
+            filter(callback=odbc_package_filter).
+            rename("airflow.providers.odbc.utils.helpers")
+        )
+
+    def refactor_papermill_package(self):
+        """
+        Fixes to "papermill" providers package.
+
+        Copies some of the classes used from core Airflow to "common.utils" package of the
+        the provider and renames imports to use them from there.
+
+        We copy lineage.py and it's __init__.py and we change import as in example diff:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/papermill/example_dags/example_papermill.py
+            +++ ./airflow/providers/papermill/example_dags/example_papermill.py
+            @@ -26,8 +26,8 @@
+             import scrapbook as sb
+
+             from airflow import DAG
+            -from airflow.lineage import AUTO
+            -from airflow.operators.python import PythonOperator
+            +from airflow.providers.papermill.utils.lineage import AUTO
+            +from airflow.operators.python_operator import PythonOperator
+             from airflow.providers.papermill.operators.papermill import PapermillOperator
+             from airflow.utils.dates import days_ago
+             from airflow.version import version
+
+
+        Note also that copied lineage __init__.py needs to be refactored as well because it uses
+        Operator class (which is not existing in Airflow 1.10.*. We have a base operator template
+        prepared that imports the BaseOperator as an Operator and copy it as "base.py" in the
+        papermill.utils package (from template_base_operator.py) and we rename import to use it from there:

Review comment:
       This seems like a lot of work, and how well have we tested it actually works?
   
   Also given how small Papermill operator is and the problems it's causing/special cases it needs: should we not do a backport release of it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429825721



##########
File path: airflow/config_templates/config.yml
##########
@@ -672,7 +672,7 @@
       version_added: ~
       type: string
       example: ~
-      default: "Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}"

Review comment:
       Let me look at this one when I get to a computer, please




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429646668



##########
File path: airflow/providers/amazon/aws/hooks/batch_client.py
##########
@@ -36,7 +36,15 @@
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.typing_compat import Protocol, runtime_checkable

Review comment:
       I think I solved all of those small quirks in a good way. The only "hacky" think is Parpermill AUTO lineage, because AUTO lineage is basically incompatible with Airflow 1.10. But the manual lineage should work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429614304



##########
File path: airflow/providers/odbc/hooks/odbc.py
##########
@@ -24,7 +24,23 @@
 import pyodbc
 
 from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.helpers import merge_dicts
+
+
+# We do not import it from airflow.utils because merge_dicts is not available in Airflow 1.10
+# So this operator would not be backportable
+def merge_dicts(dict1, dict2):

Review comment:
       ``{**dict1, **dict2}`` only merges the first dictionary. This function also merges nested dictionaries. 
   ```
   >>> dict_a = {'A': {'B': {'C': 'D'}}}
   >>> dict_b = {'A': {'B': {'E': 'F'}}}
   >>> {**dict_a, **dict_b}
   {'A': {'B': {'E': 'F'}}}
   >>> merge_dicts(dict_a, dict_b)
   {'A': {'B': {'C': 'D', 'E': 'F'}}}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633624582






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633690079






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429924893



##########
File path: setup.py
##########
@@ -506,7 +506,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     "facebook": facebook,
     "ftp": [],
     "google": google,
-    "grpc": grpc,
+    "grpc": grpc + google,

Review comment:
       ? Why do we need the entire google cloud suite for gprc hooks? That seems excessively heavy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429615602



##########
File path: airflow/providers/amazon/aws/hooks/batch_client.py
##########
@@ -36,7 +36,15 @@
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.typing_compat import Protocol, runtime_checkable

Review comment:
       Yeah I do not like it either. Let me see if I can do something about it .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429920356



##########
File path: backport_packages/import_all_provider_classes.py
##########
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import traceback
+from inspect import isclass
+from typing import List
+
+
+def import_all_provider_classes(source_path: str,
+                                provider_ids: List[str] = None,
+                                print_imports: bool = False) -> List[str]:
+    """
+    Imports all classes in providers packages. This method loads and imports
+    all the classes found in providers, so that we can find all the subclasses
+    of operators/sensors etc.
+
+    :param provider_ids - provider ids that should be loaded.
+    :param print_imports - if imported class should also be printed in output
+    :param source_path: path to look for sources - might be None to look for all packages in all source paths
+    :return: list of all imported classes
+    """
+    if provider_ids:
+        prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
+                                   for provider_id in provider_ids]
+    else:
+        prefixed_provider_paths = [source_path + "/airflow/providers/"]
+
+    imported_classes = []
+    tracebacks = []
+    for root, dirs, files in os.walk(source_path):
+        if all([not root.startswith(prefix_provider_path)
+                for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
+            # Skip loading module if it is not in the list of providers that we are looking for
+            continue
+        package_name = root[len(source_path) + 1:].replace("/", ".")
+        for file in files:
+            if file.endswith(".py"):
+                module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
+                if print_imports:
+                    print(f"Importing module: {module_name}")
+                # noinspection PyBroadException
+                try:
+                    _module = importlib.import_module(module_name)
+                    for attribute_name in dir(_module):
+                        class_name = module_name + "." + attribute_name
+                        attribute = getattr(_module, attribute_name)
+                        if isclass(attribute):
+                            if print_imports:
+                                print(f"Imported {class_name}")
+                            imported_classes.append(class_name)
+                except Exception:
+                    exception_str = traceback.format_exc()
+                    tracebacks.append(exception_str)
+    if tracebacks:
+        print()
+        print("ERROR: There were some import errors")
+        print()
+        for trace in tracebacks:
+            print("----------------------------------------")
+            print(trace)
+            print("----------------------------------------")
+        sys.exit(1)

Review comment:
       All these prints should be to sys.stdout really.

##########
File path: backport_packages/refactor_backport_packages.py
##########
@@ -0,0 +1,750 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from os.path import dirname
+from shutil import copyfile, copytree, rmtree
+from typing import List
+
+from backport_packages.setup_backport_packages import (
+    get_source_airflow_folder, get_source_providers_folder, get_target_providers_folder,
+    get_target_providers_package_folder, is_bigquery_non_dts_module,
+)
+from bowler import LN, TOKEN, Capture, Filename, Query
+from fissix.fixer_util import Comma, KeywordArg, Name
+from fissix.pytree import Leaf
+
+CLASS_TYPES = ["hooks", "operators", "sensors", "secrets", "protocols"]
+
+
+def copy_provider_sources() -> None:
+    """
+    Copies provider sources to directory where they will be refactored.
+    """
+    def rm_build_dir() -> None:
+        """
+        Removes build directory.
+        """
+        build_dir = os.path.join(dirname(__file__), "build")
+        if os.path.isdir(build_dir):
+            rmtree(build_dir)
+
+    def ignore_bigquery_files(src: str, names: List[str]) -> List[str]:
+        """
+        Ignore files with bigquery
+        :param src: source file
+        :param names: Name of the file
+        :return:
+        """
+        ignored_names = []
+        if any([src.endswith(os.path.sep + class_type) for class_type in CLASS_TYPES]):
+            ignored_names = [name for name in names
+                             if is_bigquery_non_dts_module(module_name=name)]
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                file_path = src + os.path.sep + file_name
+                with open(file_path, "rt") as file:
+                    text = file.read()
+                if any([f"airflow.providers.google.cloud.{class_type}.bigquery" in text
+                        for class_type in CLASS_TYPES]) or "_to_bigquery" in text:
+                    print(f"Ignoring {file_path}")
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_kubernetes_files(src: str, names: List[str]) -> List[str]:
+        ignored_names = []
+        if src.endswith(os.path.sep + "example_dags"):
+            for file_name in names:
+                if "example_kubernetes" in file_name:
+                    ignored_names.append(file_name)
+        return ignored_names
+
+    def ignore_some_files(src: str, names: List[str]) -> List[str]:
+        ignored_list = ignore_bigquery_files(src=src, names=names)
+        ignored_list.extend(ignore_kubernetes_files(src=src, names=names))
+        return ignored_list
+
+    rm_build_dir()
+    package_providers_dir = get_target_providers_folder()
+    if os.path.isdir(package_providers_dir):
+        rmtree(package_providers_dir)
+    copytree(get_source_providers_folder(), get_target_providers_folder(), ignore=ignore_some_files)
+
+
+def copy_helper_py_file(target_file_path: str) -> None:
+    """
+    Copies. airflow/utils/helper.py to a new location within provider package
+
+    The helper has two methods (chain, cross_downstream) that are moved from the original helper to
+    'airflow.models.baseoperator'. so in 1.10 they should reimport the original 'airflow.utils.helper'
+    methods. Those deprecated methods use importe with import_string("<IMPORT>") so it is easier to
+    replace them as strings rather than with Bowler
+
+    :param target_file_path: target path name for the helpers.py
+    """
+
+    source_helper_file_path = os.path.join(get_source_airflow_folder(), "airflow", "utils", "helpers.py")
+
+    with open(source_helper_file_path, "rt") as in_file:
+        with open(target_file_path, "wt") as out_file:
+            for line in in_file:
+                out_file.write(line.replace('airflow.models.baseoperator', 'airflow.utils.helpers'))
+
+
+class RefactorBackportPackages:
+    """
+    Refactors the code of providers, so that it works in 1.10.
+
+    """
+
+    def __init__(self):
+        self.qry = Query()
+
+    def remove_class(self, class_name) -> None:
+        """
+        Removes class altogether. Example diff generated:
+
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            +++ ./airflow/providers/google/cloud/operators/kubernetes_engine.py
+            @@ -179,86 +179,3 @@
+            -
+            -class GKEStartPodOperator(KubernetesPodOperator):
+            -
+            - ...
+
+        :param class_name: name to remove
+        """
+        # noinspection PyUnusedLocal
+        def _remover(node: LN, capture: Capture, filename: Filename) -> None:
+            if node.type not in (300, 311):  # remove only definition
+                node.remove()
+
+        self.qry.select_class(class_name).modify(_remover)
+
+    def rename_deprecated_modules(self) -> None:
+        """
+        Renames back to deprecated modules imported. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/dingding/operators/dingding.py
+            +++ ./airflow/providers/dingding/operators/dingding.py
+            @@ -16,7 +16,7 @@
+             # specific language governing permissions and limitations
+             # under the License.
+
+            -from airflow.operators.bash import BaseOperator
+            +from airflow.operators.bash_operator import BaseOperator
+             from airflow.providers.dingding.hooks.dingding import DingdingHook
+             from airflow.utils.decorators import apply_defaults
+
+        """
+        changes = [
+            ("airflow.operators.bash", "airflow.operators.bash_operator"),
+            ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.utils.session", "airflow.utils.db"),
+            (
+                "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
+                "airflow.contrib.operators.kubernetes_pod_operator"
+            ),
+        ]
+        for new, old in changes:
+            self.qry.select_module(new).rename(old)
+
+    def add_provide_context_to_python_operators(self) -> None:
+        """
+
+        Adds provide context to usages of Python/BranchPython Operators in example dags.
+        Note that those changes  apply to example DAGs not to the operators/hooks erc.
+        We package the example DAGs together with the provider classes and they should serve as
+        examples independently on the version of Airflow it will be installed in.
+        Provide_context feature in Python operators was feature added 2.0.0 and we are still
+        using the "Core" operators from the Airflow version that the backport packages are installed
+        in - the "Core" operators do not have (for now) their own provider package.
+
+        The core operators are:
+
+            * Python
+            * BranchPython
+            * Bash
+            * Branch
+            * Dummy
+            * LatestOnly
+            * ShortCircuit
+            * PythonVirtualEnv
+
+
+        Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            +++ ./airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
+            @@ -105,7 +105,8 @@
+                         task_video_ids_to_s3.google_api_response_via_xcom,
+                         task_video_ids_to_s3.task_id
+                     ],
+            -        task_id='check_and_transform_video_ids'
+            +        task_id='check_and_transform_video_ids',
+            +        provide_context=True
+                 )
+
+        """
+        # noinspection PyUnusedLocal
+        def add_provide_context_to_python_operator(node: LN, capture: Capture, filename: Filename) -> None:
+            fn_args = capture['function_arguments'][0]
+            fn_args.append_child(Comma())
+
+            provide_context_arg = KeywordArg(Name('provide_context'), Name('True'))
+            provide_context_arg.prefix = fn_args.children[0].prefix
+            fn_args.append_child(provide_context_arg)
+
+        (
+            self.qry.
+            select_function("PythonOperator").
+            is_call().
+            is_filename(include=r"mlengine_operator_utils.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+        (
+            self.qry.
+            select_function("BranchPythonOperator").
+            is_call().
+            is_filename(include=r"example_google_api_to_s3_transfer_advanced.py$").
+            modify(add_provide_context_to_python_operator)
+        )
+
+    def remove_super_init_call(self):
+        """
+        Removes super().__init__() call from Hooks. Example diff generated:
+
+        .. code-block:: diff
+
+            --- ./airflow/providers/apache/druid/hooks/druid.py
+            +++ ./airflow/providers/apache/druid/hooks/druid.py
+            @@ -49,7 +49,7 @@
+                         timeout=1,
+                         max_ingestion_time=None):
+
+            -        super().__init__()
+            +

Review comment:
       Why do we want to do this? Druid inherits basehook, surely we want to keep that still?

##########
File path: airflow/config_templates/config.yml
##########
@@ -672,7 +672,7 @@
       version_added: ~
       type: string
       example: ~
-      default: "Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}"

Review comment:
       Okay, wow, so this config option was introduced in #3534 from 2018, but was never included in any release.
   
   So I actually think the change as you have it is right, otherwise this operator wouldn't work without adding a new config option. 

##########
File path: airflow/providers/apache/hive/operators/hive.py
##########
@@ -95,8 +95,11 @@ def __init__(
         self.mapred_queue = mapred_queue
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
-        self.mapred_job_name_template = conf.get('hive',
-                                                 'mapred_job_name_template')
+        self.mapred_job_name_template = conf.get(
+            'hive', 'mapred_job_name_template', fallback='')
+        if self.mapred_job_name_template == '':
+            self.mapred_job_name_template = "Airflow HiveOperator task for " \
+                                            "{hostname}.{dag_id}.{task_id}.{execution_date}"

Review comment:
       ```suggestion
           self.mapred_job_name_template = conf.get(
               'hive', 'mapred_job_name_template', fallback="Airflow HiveOperator task for " \
                                               "{hostname}.{dag_id}.{task_id}.{execution_date}")
   ```
   
   (I've messed up the formatting, but I think you get the idea)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429665970



##########
File path: airflow/providers/papermill/ADDITIONAL_INFO.md
##########
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+## Additional notes
+
+Papermill operator is the only one that work with AUTO inlets for now (for lineage support).
+However since AUTO inlets is a feature of Airflow 2 and is not bacported,

Review comment:
       ```suggestion
   However since AUTO inlets is a feature of Airflow 2 and is not backported,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633691229






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429924264



##########
File path: scripts/ci/in_container/kubernetes/app/templates/configmaps.template.yaml
##########
@@ -204,6 +204,7 @@ data:
     [hive]
     # Default mapreduce queue for HiveOperator tasks
     default_hive_mapred_queue =
+    mapred_job_name_template =

Review comment:
       ```suggestion
   ```
   
   We shouldnt have to add empty/default values to this file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#discussion_r429923925



##########
File path: requirements/requirements-python3.7.txt
##########
@@ -372,7 +371,7 @@ virtualenv==20.0.21
 watchtower==0.7.3
 wcwidth==0.1.9
 websocket-client==0.57.0
-wrapt==1.12.1
+wrapt==1.11.2

Review comment:
       Why the downgrade?

##########
File path: requirements/requirements-python3.7.txt
##########
@@ -115,7 +115,6 @@ elasticsearch-dsl==7.2.0
 elasticsearch==7.5.1
 email-validator==1.1.1
 entrypoints==0.3
-enum34==1.1.10

Review comment:
       Where'd this go?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8991: All classes in backport providers are now importable in Airflow 1.10

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8991:
URL: https://github.com/apache/airflow/pull/8991#issuecomment-633248384


   @ashb @kaxil @BasPH @mik-laj 
   
   I think I nailed it with import tests for backport packages. I removed all the README changes from code changes so that it will be easier to review that one.  I will regenerate the READMES again in the separate commit on top of that one.
   
   I implemented all the refactorings that were needed in automated way using Bowler. I actually started to like Bowler :).
   
   I got to the point however that I had to refactor it - we had one huge method that did everything - so I separated all the refactoring code out from setup_backport_packages and I converted it into a separate class (this way Query was easier to build as a field in the class rather than passing the query around. @turbaszek ->  you might want to take a  look and see if you are ok with that, but I think the way it is done now is much more readable - we have separate refactoring methods for separate classes.
   
   Now - we also have two separate scripts for testing packages - one is to install them one-by-one for airflow 1.10 and see if they can be installed and the second one installs all of them, finds all the relevant classes and imports them (and fails if some of those imports fail). It's done rather neatly and for us It is I think a great tool to keep backport packages usable in the future - as those tests will be running continuously in CI with every PR.  @BasPH -> it was a really good idea. I see some of the changes coming (like functional DAGs) might unknowingly destroy the backward compatibility of provider packages (similar to lineage changes for Papermill) - but with this test, it will be a lot harder to introduce such problems.
   
   One last comment for Papermill - it's AUTO lineage feature caused import of examples to fail (yes - we are also checking the examples!), So I added info that AUTO lineage will not work in 1.10 (and I updated the example so that it behaves correctly in Airflow 1.10 as well)  @bolkedebruin - you might want to take a look at this part to see if I did not introduce any problems.
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org