You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/13 18:02:34 UTC

[GitHub] TrevorEdwards commented on a change in pull request #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator

TrevorEdwards commented on a change in pull request #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator
URL: https://github.com/apache/incubator-airflow/pull/3532#discussion_r209703493
 
 

 ##########
 File path: airflow/contrib/operators/gcp_container_operator.py
 ##########
 @@ -170,3 +175,147 @@ def execute(self, context):
         hook = GKEClusterHook(self.project_id, self.location)
         create_op = hook.create_cluster(cluster=self.body)
         return create_op
+
+
+KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
+G_APP_CRED = "GOOGLE_APPLICATION_CREDENTIALS"
+
+
+class GKEPodOperator(KubernetesPodOperator):
+    template_fields = ('project_id', 'location',
+                       'cluster_name') + KubernetesPodOperator.template_fields
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 location,
+                 cluster_name,
+                 gcp_conn_id='google_cloud_default',
+                 *args,
+                 **kwargs):
+        """
+        Executes a task in a Kubernetes pod in the specified Google Kubernetes
+        Engine cluster
+
+        This Operator assumes that the system has gcloud installed and either
+        has working default application credentials or has configured a
+        connection id with a service account.
+
+        The **minimum** required to define a cluster to create are the variables
+        ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``,
+        ``namespace``, and ``image``
+
+        **Operator Creation**: ::
+
+            operator = GKEPodOperator(task_id='pod_op',
+                                      project_id='my-project',
+                                      location='us-central1-a',
+                                      cluster_name='my-cluster-name',
+                                      name='task-name',
+                                      namespace='default',
+                                      image='perl')
+
+        .. seealso::
+            For more detail about application authentication have a look at the reference:
+            https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
+
+        :param project_id: The Google Developers Console project id
+        :type project_id: str
+        :param location: The name of the Google Kubernetes Engine zone in which the
+            cluster resides, e.g. 'us-central1-a'
+        :type location: str
+        :param cluster_name: The name of the Google Kubernetes Engine cluster the pod
+            should be spawned in
+        :type cluster_name: str
+        :param gcp_conn_id: The google cloud connection id to use. This allows for
+            users to specify a service account.
+        :type gcp_conn_id: str
+        """
+        super(GKEPodOperator, self).__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.cluster_name = cluster_name
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context):
+        # Specifying a service account file allows the user to using non default
+        # authentication for creating a Kubernetes Pod. This is done by setting the
+        # environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at.
+        key_file = None
+
+        # If gcp_conn_id is not specified gcloud will use the default
+        # service account credentials.
+        if self.gcp_conn_id:
+            from airflow.hooks.base_hook import BaseHook
+            # extras is a deserialized json object
+            extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson
+            # key_file only gets set if a json file is created from a JSON string in
+            # the web ui, else none
+            key_file = self._set_env_from_extras(extras=extras)
+
+        # Write config to a temp file and set the environment variable to point to it.
+        # This is to avoid race conditions of reading/writing a single file
+        with tempfile.NamedTemporaryFile() as conf_file:
+            os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name
+            # Attempt to get/update credentials
+            # We call gcloud directly instead of using google-cloud-python api
+            # because there is no way to write kubernetes config to a file, which is
+            # required by KubernetesPodOperator.
+            # The gcloud command looks at the env variable `KUBECONFIG` for where to save
+            # the kubernetes config file.
+            subprocess.check_call(
+                ["gcloud", "container", "clusters", "get-credentials",
+                 self.cluster_name,
+                 "--zone", self.location,
+                 "--project", self.project_id])
+
+            # Since the key file is of type mkstemp() closing the file will delete it from
+            # the file system so it cannot be accessed after we don't need it anymore
+            if key_file:
+                key_file.close()
+
+            # Tell `KubernetesPodOperator` where the config file is located
+            self.config_file = os.environ[KUBE_CONFIG_ENV_VAR]
+            super(GKEPodOperator, self).execute(context)
+
+    def _set_env_from_extras(self, extras):
+        """
+        Sets the environment variable `GOOGLE_APPLICATION_CREDENTIALS` with either:
+
+        - The path to the keyfile from the specified connection id
+        - A generated file's path if the user specified JSON in the connection id. The
+            file is assumed to be deleted after the process dies due to how mkstemp()
+            works.
+
+        The environment variable is used inside the gcloud command to determine correct
+        service account to use.
+        """
+        key_path = self._get_field(extras, 'key_path', False)
+        keyfile_json_str = self._get_field(extras, 'keyfile_dict', False)
+
+        if not key_path and not keyfile_json_str:
+            self.log.info('Using gcloud with application default credentials.')
+        elif key_path:
+            os.environ[G_APP_CRED] = key_path
+        else:
+            # Write service account JSON to secure file for gcloud to reference
+            service_key = tempfile.NamedTemporaryFile(delete=False)
 
 Review comment:
   delete=False opens an avenue for leaks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services