You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/31 02:12:20 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

ephraimbuddy opened a new pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024
 
 
   ---
   This PR adds hook and operator for Azure Batch Service
   https://issues.apache.org/jira/browse/AIRFLOW-4529
   
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r400620849
 
 

 ##########
 File path: setup.py
 ##########
 @@ -186,6 +186,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     'watchtower~=0.7.3',
 ]
 azure = [
+    'azure-batch==8.0.0',
 
 Review comment:
   we should have >= here. By definition we should have an open-upper requirements in setup.py

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r400716020
 
 

 ##########
 File path: setup.py
 ##########
 @@ -186,6 +186,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     'watchtower~=0.7.3',
 ]
 azure = [
+    'azure-batch==8.0.0',
 
 Review comment:
   Alright. Noted. I'll change it. Thanks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-607445924
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=h1) Report
   > Merging [#8024](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/340e947d5db0ae1a5d5ca2d464fb78779acadc69&el=desc) will **decrease** coverage by `27.43%`.
   > The diff coverage is `0.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/8024/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #8024       +/-   ##
   ===========================================
   - Coverage   87.25%   59.82%   -27.44%     
   ===========================================
     Files         935      937        +2     
     Lines       45384    45566      +182     
   ===========================================
   - Hits        39601    27259    -12342     
   - Misses       5783    18307    +12524     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `93.66% <ø> (-1.41%)` | :arrow_down: |
   | [...low/providers/microsoft/azure/hooks/azure\_batch.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL2hvb2tzL2F6dXJlX2JhdGNoLnB5) | `0.00% <0.00%> (ø)` | |
   | [...providers/microsoft/azure/operators/azure\_batch.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy9henVyZV9iYXRjaC5weQ==) | `0.00% <0.00%> (ø)` | |
   | [airflow/utils/db.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYi5weQ==) | `98.33% <100.00%> (+0.01%)` | :arrow_up: |
   | [airflow/providers/amazon/aws/hooks/kinesis.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9raW5lc2lzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/apache/livy/sensors/livy.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2xpdnkvc2Vuc29ycy9saXZ5LnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/sensors/redshift.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9zZW5zb3JzL3JlZHNoaWZ0LnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/mysql/operators/s3\_to\_mysql.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbXlzcWwvb3BlcmF0b3JzL3MzX3RvX215c3FsLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/microsoft/azure/operators/adx.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy9hZHgucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [318 more](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=footer). Last update [340e947...117d49e](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-607171477
 
 
   > Hey @ephraimbuddy -> please rebase now and run those `./breeze generate-requirements --python 3.6` `./breeze generate-requirements --python 3.7`
   Rebased and running it. Currently pulling the images and might take sometime because my machine is slow(i3).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil merged pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-608584938
 
 
   > LGTM @kaxil - do you want to have another look ?
   
   Checking now

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r401561189
 
 

 ##########
 File path: airflow/providers/microsoft/azure/hooks/azure_batch.py
 ##########
 @@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+from datetime import timedelta
+from typing import Optional
+
+from azure.batch import BatchServiceClient, batch_auth, models as batch_models
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils import timezone
+
+
+class AzureBatchHook(BaseHook):
+    """
+    Hook for Azure Batch APIs
+    """
+
+    def __init__(self, azure_batch_conn_id='azure_batch_default'):
+        super().__init__()
+        self.conn_id = azure_batch_conn_id
+        self.connection = self.get_conn()
 
 Review comment:
   I have tried to add this but I keep getting binascii.Error: Incorrect padding when I run the test for operator

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r400620446
 
 

 ##########
 File path: airflow/providers/microsoft/azure/hooks/azure_batch.py
 ##########
 @@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+from datetime import timedelta
+from typing import Optional
+
+from azure.batch import BatchServiceClient, batch_auth, models as batch_models
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils import timezone
+
+
+class AzureBatchHook(BaseHook):
+    """
+    Hook for Azure Batch APIs
+    """
+
+    def __init__(self, azure_batch_conn_id='azure_batch_default'):
+        super().__init__()
+        self.conn_id = azure_batch_conn_id
+        self.connection = self.get_conn()
+        self.extra = self._connection().extra_dejson
+
+    def _connection(self):
+        """
+        Get connected to azure batch service
+        """
+        conn = self.get_connection(self.conn_id)
+        return conn
+
+    def get_conn(self):
+        """
+        Get the batch client connection
+        :return: Azure batch client
+        """
+        conn = self._connection()
+
+        def _get_required_param(name):
+            """Extract required parameter from extra JSON, raise exception if not found"""
+            value = conn.extra_dejson.get(name)
+            if not value:
+                raise AirflowException(
+                    'Extra connection option is missing required parameter: `{}`'.
+                    format(name))
+            return value
+        batch_account_name = _get_required_param('account_name')
+        batch_account_key = _get_required_param('account_key')
+        batch_account_url = _get_required_param('account_url')
+        credentials = batch_auth.SharedKeyCredentials(batch_account_name,
+                                                      batch_account_key)
+        batch_client = BatchServiceClient(
+            credentials,
+            batch_url=batch_account_url)
+        return batch_client
+
+    def configure_pool(self,
+                       pool_id: str,
+                       vm_size: str,
+                       display_name: Optional[str] = None,
+                       target_dedicated_nodes: Optional[int] = None,
+                       use_latest_image_and_sku: bool = False,
+                       vm_publisher: Optional[str] = None,
+                       vm_offer: Optional[str] = None,
+                       sku_starts_with: Optional[str] = None,
+                       **kwargs
+                       ):
+        """
+        Configures a pool
+
+        :param pool_id: A string that uniquely identifies the Pool within the Account
+        :type pool_id: str
+
+        :param vm_size: The size of virtual machines in the Pool.
+        :type vm_size: str
+
+        :param display_name: The display name for the Pool
+        :type display_name: str
+
+        :param target_dedicated_nodes: The desired number of dedicated Compute Nodes in the Pool.
+        :type target_dedicated_nodes: Optional[int]
+
+        :param use_latest_image_and_sku: Whether to use the latest verified vm image and sku
+        :type use_latest_image_and_sku: bool
+
+        :param vm_publisher: The publisher of the Azure Virtual Machines Marketplace Image.
+            For example, Canonical or MicrosoftWindowsServer.
+        :type vm_publisher: Optional[str]
+
+        :param vm_offer: The offer type of the Azure Virtual Machines Marketplace Image.
+            For example, UbuntuServer or WindowsServer.
+        :type vm_offer: Optional[str]
+
+        :param sku_starts_with: The start name of the sku to search
+        :type sku_starts_with: Optional[str]
+        """
+        if use_latest_image_and_sku:
+            self.log.info('Using latest verified virtual machine image with node agent sku')
+            sku_to_use, image_ref_to_use = \
+                self._get_latest_verified_image_vm_and_sku(publisher=vm_publisher,
+                                                           offer=vm_offer,
+                                                           sku_starts_with=sku_starts_with)
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
+                    image_reference=image_ref_to_use,
+                    node_agent_sku_id=sku_to_use
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+
+        elif self.extra.get('os_family'):
+            self.log.info('Using cloud service configuration to create pool, '
+                          'virtual machine configuration ignored')
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                cloud_service_configuration=batch_models.CloudServiceConfiguration(
+                    os_family=self.extra.get('os_family'),
+                    os_version=self.extra.get('os_version')
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+
+        else:
+            self.log.info('Using virtual machine configuration to create a pool')
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
+                    image_reference=batch_models.ImageReference(
+                        publisher=self.extra.get('vm_publisher'),
+                        offer=self.extra.get('vm_offer'),
+                        sku=self.extra.get('vm_sku'),
+                        version=self.extra.get("vm_version")
+                    ),
+                    node_agent_sku_id=self.extra.get('node_agent_sku_id')
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+        return pool
+
+    def create_pool(self, pool):
+        """
+        Creates a pool if not already existing
+        :param pool: the pool object to create
+        :type pool: batch_models.PoolAddParameter
+
+        """
+        try:
+            self.log.info("Attempting to create a pool: %s", pool.id)
+            self.connection.pool.add(pool)
+            self.log.info("Created pool: %s", pool.id)
+        except batch_models.BatchErrorException as e:
+            if e.error.code != "PoolExists":
+                raise
+            else:
+                self.log.info("Pool %s already exists", pool.id)
+
+    def _get_latest_verified_image_vm_and_sku(self, publisher, offer, sku_starts_with):
+        """
+        Get latest verified image vm and sku
+
+        :param publisher: The publisher of the Azure Virtual Machines Marketplace Image.
+            For example, Canonical or MicrosoftWindowsServer.
+        :type publisher: str
+        :param offer: The offer type of the Azure Virtual Machines Marketplace Image.
+            For example, UbuntuServer or WindowsServer.
+        :type offer: str
+        :param sku_starts_with: The start name of the sku to search
+        :type sku_starts_with: str
+        """
+
+        options = batch_models.AccountListSupportedImagesOptions(
+            filter="verificationType eq 'verified'")
+        images = self.connection.account.list_supported_images(
+            account_list_supported_images_options=options)
+        # pick the latest supported sku
+        skus_to_use = [
+            (image.node_agent_sku_id, image.image_reference) for image in images
+            if image.image_reference.publisher.lower() == publisher.lower() and
+            image.image_reference.offer.lower() == offer.lower() and
+            image.image_reference.sku.startswith(sku_starts_with)
+        ]
+
+        # pick first
+        agent_sku_id, image_ref_to_use = skus_to_use[0]
+        return agent_sku_id, image_ref_to_use
+
+    def wait_for_all_node_state(self, pool_id, node_state):
+        """
+        Wait for all nodes in a pool to reach given states
+
+        :param pool_id: A string that identifies the pool
+        :type pool_id: str
+        :param node_state: A set of batch_models.ComputeNodeState
+        :type node_state: set
+        """
+        self.log.info('waiting for all nodes in pool %s to reach one of: %s',
+                      pool_id, node_state)
+        i = 0
+        while True:
+            # refresh pool to ensure that there is no resize error
+            pool = self.connection.pool.get(pool_id)
+            if pool.resize_errors is not None:
+                resize_errors = "\n".join([repr(e) for e in pool.resize_errors])
+                raise RuntimeError(
+                    'resize error encountered for pool {}:\n{}'.format(
+                        pool.id, resize_errors))
+            nodes = list(self.connection.compute_node.list(pool.id))
+            if (len(nodes) >= pool.target_dedicated_nodes and
+                    all(node.state in node_state for node in nodes)):
+                return nodes
+            i += 1
+            if i % 3 == 0:
 
 Review comment:
   Why the magic constant :)? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy edited a comment on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-607171477
 
 
   > Hey @ephraimbuddy -> please rebase now and run those `./breeze generate-requirements --python 3.6` `./breeze generate-requirements --python 3.7`
   
   Rebased and running it. Currently pulling the images and might take sometime because my machine is slow(i3).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-607144189
 
 
   Hey @ephraimbuddy -> please rebase now and run those `./breeze generate-requirements --python 3.6` `./breeze generate-requirements --python 3.7`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#issuecomment-607445924
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=h1) Report
   > Merging [#8024](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/340e947d5db0ae1a5d5ca2d464fb78779acadc69&el=desc) will **decrease** coverage by `27.43%`.
   > The diff coverage is `0.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/8024/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #8024       +/-   ##
   ===========================================
   - Coverage   87.25%   59.82%   -27.44%     
   ===========================================
     Files         935      937        +2     
     Lines       45384    45566      +182     
   ===========================================
   - Hits        39601    27259    -12342     
   - Misses       5783    18307    +12524     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `93.66% <ø> (-1.41%)` | :arrow_down: |
   | [...low/providers/microsoft/azure/hooks/azure\_batch.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL2hvb2tzL2F6dXJlX2JhdGNoLnB5) | `0.00% <0.00%> (ø)` | |
   | [...providers/microsoft/azure/operators/azure\_batch.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy9henVyZV9iYXRjaC5weQ==) | `0.00% <0.00%> (ø)` | |
   | [airflow/utils/db.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYi5weQ==) | `98.33% <100.00%> (+0.01%)` | :arrow_up: |
   | [airflow/providers/amazon/aws/hooks/kinesis.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9raW5lc2lzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/apache/livy/sensors/livy.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2xpdnkvc2Vuc29ycy9saXZ5LnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/sensors/redshift.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9zZW5zb3JzL3JlZHNoaWZ0LnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/mysql/operators/s3\_to\_mysql.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbXlzcWwvb3BlcmF0b3JzL3MzX3RvX215c3FsLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/providers/microsoft/azure/operators/adx.py](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy9hZHgucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [318 more](https://codecov.io/gh/apache/airflow/pull/8024/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=footer). Last update [340e947...117d49e](https://codecov.io/gh/apache/airflow/pull/8024?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r401520091
 
 

 ##########
 File path: airflow/providers/microsoft/azure/hooks/azure_batch.py
 ##########
 @@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+from datetime import timedelta
+from typing import Optional
+
+from azure.batch import BatchServiceClient, batch_auth, models as batch_models
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils import timezone
+
+
+class AzureBatchHook(BaseHook):
+    """
+    Hook for Azure Batch APIs
+    """
+
+    def __init__(self, azure_batch_conn_id='azure_batch_default'):
+        super().__init__()
+        self.conn_id = azure_batch_conn_id
+        self.connection = self.get_conn()
 
 Review comment:
   I would say we can remove `self.connection` from here and use the get_conn or get_connection method in the `execute` method of Operator

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r400715490
 
 

 ##########
 File path: airflow/providers/microsoft/azure/hooks/azure_batch.py
 ##########
 @@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+from datetime import timedelta
+from typing import Optional
+
+from azure.batch import BatchServiceClient, batch_auth, models as batch_models
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils import timezone
+
+
+class AzureBatchHook(BaseHook):
+    """
+    Hook for Azure Batch APIs
+    """
+
+    def __init__(self, azure_batch_conn_id='azure_batch_default'):
+        super().__init__()
+        self.conn_id = azure_batch_conn_id
+        self.connection = self.get_conn()
+        self.extra = self._connection().extra_dejson
+
+    def _connection(self):
+        """
+        Get connected to azure batch service
+        """
+        conn = self.get_connection(self.conn_id)
+        return conn
+
+    def get_conn(self):
+        """
+        Get the batch client connection
+        :return: Azure batch client
+        """
+        conn = self._connection()
+
+        def _get_required_param(name):
+            """Extract required parameter from extra JSON, raise exception if not found"""
+            value = conn.extra_dejson.get(name)
+            if not value:
+                raise AirflowException(
+                    'Extra connection option is missing required parameter: `{}`'.
+                    format(name))
+            return value
+        batch_account_name = _get_required_param('account_name')
+        batch_account_key = _get_required_param('account_key')
+        batch_account_url = _get_required_param('account_url')
+        credentials = batch_auth.SharedKeyCredentials(batch_account_name,
+                                                      batch_account_key)
+        batch_client = BatchServiceClient(
+            credentials,
+            batch_url=batch_account_url)
+        return batch_client
+
+    def configure_pool(self,
+                       pool_id: str,
+                       vm_size: str,
+                       display_name: Optional[str] = None,
+                       target_dedicated_nodes: Optional[int] = None,
+                       use_latest_image_and_sku: bool = False,
+                       vm_publisher: Optional[str] = None,
+                       vm_offer: Optional[str] = None,
+                       sku_starts_with: Optional[str] = None,
+                       **kwargs
+                       ):
+        """
+        Configures a pool
+
+        :param pool_id: A string that uniquely identifies the Pool within the Account
+        :type pool_id: str
+
+        :param vm_size: The size of virtual machines in the Pool.
+        :type vm_size: str
+
+        :param display_name: The display name for the Pool
+        :type display_name: str
+
+        :param target_dedicated_nodes: The desired number of dedicated Compute Nodes in the Pool.
+        :type target_dedicated_nodes: Optional[int]
+
+        :param use_latest_image_and_sku: Whether to use the latest verified vm image and sku
+        :type use_latest_image_and_sku: bool
+
+        :param vm_publisher: The publisher of the Azure Virtual Machines Marketplace Image.
+            For example, Canonical or MicrosoftWindowsServer.
+        :type vm_publisher: Optional[str]
+
+        :param vm_offer: The offer type of the Azure Virtual Machines Marketplace Image.
+            For example, UbuntuServer or WindowsServer.
+        :type vm_offer: Optional[str]
+
+        :param sku_starts_with: The start name of the sku to search
+        :type sku_starts_with: Optional[str]
+        """
+        if use_latest_image_and_sku:
+            self.log.info('Using latest verified virtual machine image with node agent sku')
+            sku_to_use, image_ref_to_use = \
+                self._get_latest_verified_image_vm_and_sku(publisher=vm_publisher,
+                                                           offer=vm_offer,
+                                                           sku_starts_with=sku_starts_with)
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
+                    image_reference=image_ref_to_use,
+                    node_agent_sku_id=sku_to_use
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+
+        elif self.extra.get('os_family'):
+            self.log.info('Using cloud service configuration to create pool, '
+                          'virtual machine configuration ignored')
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                cloud_service_configuration=batch_models.CloudServiceConfiguration(
+                    os_family=self.extra.get('os_family'),
+                    os_version=self.extra.get('os_version')
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+
+        else:
+            self.log.info('Using virtual machine configuration to create a pool')
+            pool = batch_models.PoolAddParameter(
+                id=pool_id,
+                vm_size=vm_size,
+                display_name=display_name,
+                virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
+                    image_reference=batch_models.ImageReference(
+                        publisher=self.extra.get('vm_publisher'),
+                        offer=self.extra.get('vm_offer'),
+                        sku=self.extra.get('vm_sku'),
+                        version=self.extra.get("vm_version")
+                    ),
+                    node_agent_sku_id=self.extra.get('node_agent_sku_id')
+                ),
+                target_dedicated_nodes=target_dedicated_nodes,
+                **kwargs)
+        return pool
+
+    def create_pool(self, pool):
+        """
+        Creates a pool if not already existing
+        :param pool: the pool object to create
+        :type pool: batch_models.PoolAddParameter
+
+        """
+        try:
+            self.log.info("Attempting to create a pool: %s", pool.id)
+            self.connection.pool.add(pool)
+            self.log.info("Created pool: %s", pool.id)
+        except batch_models.BatchErrorException as e:
+            if e.error.code != "PoolExists":
+                raise
+            else:
+                self.log.info("Pool %s already exists", pool.id)
+
+    def _get_latest_verified_image_vm_and_sku(self, publisher, offer, sku_starts_with):
+        """
+        Get latest verified image vm and sku
+
+        :param publisher: The publisher of the Azure Virtual Machines Marketplace Image.
+            For example, Canonical or MicrosoftWindowsServer.
+        :type publisher: str
+        :param offer: The offer type of the Azure Virtual Machines Marketplace Image.
+            For example, UbuntuServer or WindowsServer.
+        :type offer: str
+        :param sku_starts_with: The start name of the sku to search
+        :type sku_starts_with: str
+        """
+
+        options = batch_models.AccountListSupportedImagesOptions(
+            filter="verificationType eq 'verified'")
+        images = self.connection.account.list_supported_images(
+            account_list_supported_images_options=options)
+        # pick the latest supported sku
+        skus_to_use = [
+            (image.node_agent_sku_id, image.image_reference) for image in images
+            if image.image_reference.publisher.lower() == publisher.lower() and
+            image.image_reference.offer.lower() == offer.lower() and
+            image.image_reference.sku.startswith(sku_starts_with)
+        ]
+
+        # pick first
+        agent_sku_id, image_ref_to_use = skus_to_use[0]
+        return agent_sku_id, image_ref_to_use
+
+    def wait_for_all_node_state(self, pool_id, node_state):
+        """
+        Wait for all nodes in a pool to reach given states
+
+        :param pool_id: A string that identifies the pool
+        :type pool_id: str
+        :param node_state: A set of batch_models.ComputeNodeState
+        :type node_state: set
+        """
+        self.log.info('waiting for all nodes in pool %s to reach one of: %s',
+                      pool_id, node_state)
+        i = 0
+        while True:
+            # refresh pool to ensure that there is no resize error
+            pool = self.connection.pool.get(pool_id)
+            if pool.resize_errors is not None:
+                resize_errors = "\n".join([repr(e) for e in pool.resize_errors])
+                raise RuntimeError(
+                    'resize error encountered for pool {}:\n{}'.format(
+                        pool.id, resize_errors))
+            nodes = list(self.connection.compute_node.list(pool.id))
+            if (len(nodes) >= pool.target_dedicated_nodes and
+                    all(node.state in node_state for node in nodes)):
+                return nodes
+            i += 1
+            if i % 3 == 0:
 
 Review comment:
   Mistake. I wanted to raise exception after checking the states 3 times. I can see I didn't raise it. I'll do the correction. Thanks for the review

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #8024: [AIRFLOW-4529] Add support for Azure Batch Service
URL: https://github.com/apache/airflow/pull/8024#discussion_r401539770
 
 

 ##########
 File path: airflow/providers/microsoft/azure/hooks/azure_batch.py
 ##########
 @@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+from datetime import timedelta
+from typing import Optional
+
+from azure.batch import BatchServiceClient, batch_auth, models as batch_models
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils import timezone
+
+
+class AzureBatchHook(BaseHook):
+    """
+    Hook for Azure Batch APIs
+    """
+
+    def __init__(self, azure_batch_conn_id='azure_batch_default'):
+        super().__init__()
+        self.conn_id = azure_batch_conn_id
+        self.connection = self.get_conn()
 
 Review comment:
   Alright. I'll fix it. Thanks :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services