You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/06/06 18:53:09 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #4530: [AIRFLOW-3282] Implement Azure Kubernetes Service Operator

mik-laj commented on a change in pull request #4530: [AIRFLOW-3282] Implement Azure Kubernetes Service Operator
URL: https://github.com/apache/airflow/pull/4530#discussion_r291324318
 
 

 ##########
 File path: airflow/contrib/operators/azure_kubernetes_operator.py
 ##########
 @@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Operator for Azure Kubernetes Service cluster operations
+"""
+
+from azure.mgmt.containerservice.models import ContainerServiceLinuxProfile
+from azure.mgmt.containerservice.models import ContainerServiceServicePrincipalProfile
+from azure.mgmt.containerservice.models import ContainerServiceSshConfiguration
+from azure.mgmt.containerservice.models import ContainerServiceSshPublicKey
+from azure.mgmt.containerservice.models import ContainerServiceStorageProfileTypes
+from azure.mgmt.containerservice.models import ManagedCluster
+from azure.mgmt.containerservice.models import ManagedClusterAgentPoolProfile
+
+from msrestazure.azure_exceptions import CloudError
+
+from airflow.contrib.hooks.azure_kubernetes_hook import AzureKubernetesServiceHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.utils.aks_utils import \
+    is_valid_ssh_rsa_public_key, get_poller_result, get_public_key, get_default_dns_prefix
+
+
+class AzureKubernetesOperator(BaseOperator):
+    """
+    Start a Azure Kubernetes Service
+
+    :param ci_conn_id: connection id of a
+        service principal which will be used to start the azure kubernetes service
+    :type ci_conn_id: str
+    :param resource_group: Required name of the resource group
+        wherein this container instance should be started
+    :type resource_group: str
+    :param name: Required name of this container. Please note this name
+        has to be unique in order to run containers in parallel.
+    :type name: str
+    :param ssh_key_value: the ssh value used to connect to machine to be used
+    :type ssh_key_value: str
+    :param dns_name_prefix: DNS prefix specified when creating the managed cluster.
+    :type region: dns_name_prefix
+    :param admin_username: The administrator username to use for Linux VMs.
+    :type admin_username: str
+    :param kubernetes_version: Version of Kubernetes specified when creating the managed cluster.
+    :type kubernetes_version: str
+    :param node_vm_size: Vm to be spin up.
+    :type node_vm_size: str or ContainerServiceVMSizeTypes Enum
+    :param node_osdisk_size: Size in GB to be used to specify the disk size for
+        every machine in this master/agent pool. If you specify 0, it will apply the default
+        osDisk size according to the vmSize specified.
+    :type node_osdisk_size: int
+    :param node_count: Number of agents (VMs) to host docker containers.
+        Allowed values must be in the range of 1 to 100 (inclusive). The default value is 1.
+    :type node_count: int
+    :param no_ssh_key: Specified if it is linuxprofile.
+    :type no_ssh_key: boolean
+    :param vnet_subnet_id: VNet SubnetID specifies the vnet's subnet identifier.
+    :type vnet_subnet_id: str
+    :param max_pods: Maximum number of pods that can run on a node.
+    :type max_pods: int
+    :param os_type: OsType to be used to specify os type. Choose from Linux and Windows.
+        Default to Linux.
+    :type os_type: str or OSType Enum
+    :param tags: Resource tags.
+    :type tags: dict[str, str]
+    :param location: Required resource location
+    :type location: str
+
+    :Example:
+
+    >>> a = AzureKubernetesOperator(task_id="task",ci_conn_id='azure_kubernetes_default',
+            resource_group="my_resource_group",
+            name="my_aks_container",
+            ssh_key_value=None,
+            dns_name_prefix=None,
+            location="my_region",
+            tags=None
+        )
+    """
+
+    # Disabling pylint warnings - all of these arguments are required for cluster creation
+    # pylint: disable=too-many-instance-attributes
+    # pylint: disable=too-many-arguments
+    @apply_defaults
+    def __init__(self, ci_conn_id, resource_group, name, ssh_key_value,
+                 dns_name_prefix=None,
+                 location=None,
+                 admin_username="azureuser",
+                 kubernetes_version='',
+                 node_vm_size="Standard_DS2_v2",
+                 node_osdisk_size=0,
+                 node_count=3,
+                 no_ssh_key=False,
+                 vnet_subnet_id=None,
+                 max_pods=None,
+                 os_type="Linux",
+                 tags=None,
+                 *args, **kwargs):
+
+        self.ci_conn_id = ci_conn_id
+        self.resource_group = resource_group
+        self.name = name
+        self.no_ssh_key = no_ssh_key
+        self.dns_name_prefix = dns_name_prefix
+        self.location = location
+        self.admin_username = admin_username
+        self.node_vm_size = node_vm_size
+        self.node_count = node_count
+        self.ssh_key_value = ssh_key_value
+        self.vnet_subnet_id = vnet_subnet_id
+        self.max_pods = max_pods
+        self.os_type = os_type
+        self.tags = tags
+        self.node_osdisk_size = node_osdisk_size
+        self.kubernetes_version = kubernetes_version
+
+        super(AzureKubernetesOperator, self).__init__(*args, **kwargs)
+
+    def execute(self, context):
+        ci_hook = AzureKubernetesServiceHook(self.ci_conn_id)
+
+        containerservice = ci_hook.get_conn()
+
+        if not self.no_ssh_key:
+            try:
+                if not self.ssh_key_value or not is_valid_ssh_rsa_public_key(self.ssh_key_value):
+                    raise ValueError()
+            except (TypeError, ValueError):
+                self.ssh_key_value = get_public_key()
+
+        # dns_prefix
+        if not self.dns_name_prefix:
+            self.dns_name_prefix = get_default_dns_prefix(
+                self.name, self.resource_group, ci_hook.subscription_id)
+
+        # Check if the resource group exists
+        if ci_hook.check_resource(ci_hook.credentials, ci_hook.subscription_id, self.resource_group):
+            self.log.info("Resource group already existing: %s", self.resource_group)
+        else:
+            self.log.info("Creating resource %s", self.resource_group)
+            created_resource_group = ci_hook.create_resource(
+                ci_hook.credentials, ci_hook.subscription_id, self.resource_group, {
+                    'location': self.location})
+            print("Got resource group %s", created_resource_group.name)
 
 Review comment:
   ```suggestion
               logging.info("Got resource group %s", created_resource_group.name)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services