You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/28 14:51:27 UTC

incubator-airflow git commit: [AIRFLOW-2530] KubernetesOperator supports multiple clusters

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f77a93191 -> 11e670ddb


[AIRFLOW-2530] KubernetesOperator supports multiple clusters

Closes #3425 from mrkm4ntr/airflow-2530


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/11e670dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/11e670dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/11e670dd

Branch: refs/heads/master
Commit: 11e670ddbce419489c798f26d3e94e7d3a00f5eb
Parents: f77a931
Author: Shintaro Murakami <mr...@gmail.com>
Authored: Mon May 28 16:51:22 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon May 28 16:51:22 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/kubernetes/kube_client.py           | 16 ++++++++++------
 airflow/contrib/kubernetes/pod_launcher.py          |  5 +++--
 .../contrib/operators/kubernetes_pod_operator.py    | 10 ++++++++--
 3 files changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 35e8410..7552e99 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -17,16 +17,20 @@
 from airflow.configuration import conf
 
 
-def _load_kube_config(in_cluster):
+def _load_kube_config(in_cluster, cluster_context):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        config.load_kube_config()
-        return client.CoreV1Api()
+        if cluster_context is None:
+            config.load_kube_config()
+            return client.CoreV1Api()
+        else:
+            return client.CoreV1Api(
+                api_client=config.new_client_from_config(context=cluster_context))
 
 
-def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
-    # TODO: This should also allow people to point to a cluster.
-    return _load_kube_config(in_cluster)
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'),
+                    cluster_context=None):
+    return _load_kube_config(in_cluster, cluster_context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index f1467a9..c1c3f30 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -37,9 +37,10 @@ class PodStatus(object):
 
 
 class PodLauncher(LoggingMixin):
-    def __init__(self, kube_client=None, in_cluster=True):
+    def __init__(self, kube_client=None, in_cluster=True, cluster_context=None):
         super(PodLauncher, self).__init__()
-        self._client = kube_client or get_kube_client(in_cluster=in_cluster)
+        self._client = kube_client or get_kube_client(in_cluster=in_cluster,
+                                                      cluster_context=cluster_context)
         self._watch = watch.Watch()
         self.kube_req_factory = pod_fac.SimplePodRequestFactory()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 8e88b68..fa06b08 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -63,6 +63,9 @@ class KubernetesPodOperator(BaseOperator):
     :type secrets: list of Secret
     :param in_cluster: run kubernetes client with in_cluster configuration
     :type in_cluster: bool
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used.
+    :type cluster_context: string
     :param get_logs: get the stdout of the container as logs of the tasks
     :type get_logs: bool
     :param affinity: A dict containing a group of affinity scheduling rules
@@ -72,7 +75,8 @@ class KubernetesPodOperator(BaseOperator):
 
     def execute(self, context):
         try:
-            client = kube_client.get_kube_client(in_cluster=self.in_cluster)
+            client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+                                                 cluster_context=self.cluster_context)
             gen = pod_generator.PodGenerator()
 
             for mount in self.volume_mounts:
@@ -96,7 +100,7 @@ class KubernetesPodOperator(BaseOperator):
             pod.resources = self.resources
             pod.affinity = self.affinity
 
-            launcher = pod_launcher.PodLauncher(client)
+            launcher = pod_launcher.PodLauncher(kube_client=client)
             final_state = launcher.run_pod(
                 pod,
                 startup_timeout=self.startup_timeout_seconds,
@@ -120,6 +124,7 @@ class KubernetesPodOperator(BaseOperator):
                  env_vars=None,
                  secrets=None,
                  in_cluster=False,
+                 cluster_context=None,
                  labels=None,
                  startup_timeout_seconds=120,
                  get_logs=True,
@@ -142,6 +147,7 @@ class KubernetesPodOperator(BaseOperator):
         self.volumes = volumes or []
         self.secrets = secrets or []
         self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
         self.get_logs = get_logs
         self.image_pull_policy = image_pull_policy
         self.annotations = annotations or {}