You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/28 14:51:27 UTC
incubator-airflow git commit: [AIRFLOW-2530] KubernetesOperator
supports multiple clusters
Repository: incubator-airflow
Updated Branches:
refs/heads/master f77a93191 -> 11e670ddb
[AIRFLOW-2530] KubernetesOperator supports multiple clusters
Closes #3425 from mrkm4ntr/airflow-2530
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/11e670dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/11e670dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/11e670dd
Branch: refs/heads/master
Commit: 11e670ddbce419489c798f26d3e94e7d3a00f5eb
Parents: f77a931
Author: Shintaro Murakami <mr...@gmail.com>
Authored: Mon May 28 16:51:22 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon May 28 16:51:22 2018 +0200
----------------------------------------------------------------------
airflow/contrib/kubernetes/kube_client.py | 16 ++++++++++------
airflow/contrib/kubernetes/pod_launcher.py | 5 +++--
.../contrib/operators/kubernetes_pod_operator.py | 10 ++++++++--
3 files changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 35e8410..7552e99 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -17,16 +17,20 @@
from airflow.configuration import conf
-def _load_kube_config(in_cluster):
+def _load_kube_config(in_cluster, cluster_context):
from kubernetes import config, client
if in_cluster:
config.load_incluster_config()
return client.CoreV1Api()
else:
- config.load_kube_config()
- return client.CoreV1Api()
+ if cluster_context is None:
+ config.load_kube_config()
+ return client.CoreV1Api()
+ else:
+ return client.CoreV1Api(
+ api_client=config.new_client_from_config(context=cluster_context))
-def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
- # TODO: This should also allow people to point to a cluster.
- return _load_kube_config(in_cluster)
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'),
+ cluster_context=None):
+ return _load_kube_config(in_cluster, cluster_context)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index f1467a9..c1c3f30 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -37,9 +37,10 @@ class PodStatus(object):
class PodLauncher(LoggingMixin):
- def __init__(self, kube_client=None, in_cluster=True):
+ def __init__(self, kube_client=None, in_cluster=True, cluster_context=None):
super(PodLauncher, self).__init__()
- self._client = kube_client or get_kube_client(in_cluster=in_cluster)
+ self._client = kube_client or get_kube_client(in_cluster=in_cluster,
+ cluster_context=cluster_context)
self._watch = watch.Watch()
self.kube_req_factory = pod_fac.SimplePodRequestFactory()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 8e88b68..fa06b08 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -63,6 +63,9 @@ class KubernetesPodOperator(BaseOperator):
:type secrets: list of Secret
:param in_cluster: run kubernetes client with in_cluster configuration
:type in_cluster: bool
+ :param cluster_context: context that points to kubernetes cluster.
+ Ignored when in_cluster is True. If None, current-context is used.
+ :type cluster_context: string
:param get_logs: get the stdout of the container as logs of the tasks
:type get_logs: bool
:param affinity: A dict containing a group of affinity scheduling rules
@@ -72,7 +75,8 @@ class KubernetesPodOperator(BaseOperator):
def execute(self, context):
try:
- client = kube_client.get_kube_client(in_cluster=self.in_cluster)
+ client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+ cluster_context=self.cluster_context)
gen = pod_generator.PodGenerator()
for mount in self.volume_mounts:
@@ -96,7 +100,7 @@ class KubernetesPodOperator(BaseOperator):
pod.resources = self.resources
pod.affinity = self.affinity
- launcher = pod_launcher.PodLauncher(client)
+ launcher = pod_launcher.PodLauncher(kube_client=client)
final_state = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
@@ -120,6 +124,7 @@ class KubernetesPodOperator(BaseOperator):
env_vars=None,
secrets=None,
in_cluster=False,
+ cluster_context=None,
labels=None,
startup_timeout_seconds=120,
get_logs=True,
@@ -142,6 +147,7 @@ class KubernetesPodOperator(BaseOperator):
self.volumes = volumes or []
self.secrets = secrets or []
self.in_cluster = in_cluster
+ self.cluster_context = cluster_context
self.get_logs = get_logs
self.image_pull_policy = image_pull_policy
self.annotations = annotations or {}