You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/18 05:02:22 UTC
[skywalking] branch master updated: Improve Kubernetes Pod lister with field selector (#9795)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5abe6ceb1f Improve Kubernetes Pod lister with field selector (#9795)
5abe6ceb1f is described below
commit 5abe6ceb1ff0938be917f8b440c5eb3590cf31db
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Tue Oct 18 13:02:07 2022 +0800
Improve Kubernetes Pod lister with field selector (#9795)
---
.../oap/meter/analyzer/k8s/K8sInfoRegistry.java | 76 ++++++++++++----------
.../oap/meter/analyzer/dsl/K8sTagTest.java | 26 ++++++--
.../library/kubernetes/KubernetesPods.java | 51 ++++++++++-----
.../library/kubernetes/KubernetesServices.java | 19 ++++++
.../skywalking/library/kubernetes/ObjectID.java | 47 +++++++++++++
.../receiver/envoy/als/k8s/K8SServiceRegistry.java | 27 +++-----
6 files changed, 170 insertions(+), 76 deletions(-)
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
index 483b4a9a58..a82e185181 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -26,34 +26,31 @@ import java.util.Objects;
import java.util.Optional;
import org.apache.skywalking.library.kubernetes.KubernetesPods;
import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import lombok.SneakyThrows;
public class K8sInfoRegistry {
-
private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
- private final LoadingCache<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap;
- private final LoadingCache<String/* podIP */, String /* podName.namespace */> ipPodMap;
- private final LoadingCache<String/* serviceIP */, String /* serviceName.namespace */> ipServiceMap;
- private static final String SEPARATOR = ".";
+ private final LoadingCache<ObjectID /* Pod */, ObjectID /* Service */> podServiceMap;
+ private final LoadingCache<String/* podIP */, ObjectID /* Pod */> ipPodMap;
+ private final LoadingCache<String/* serviceIP */, ObjectID /* Service */> ipServiceMap;
private K8sInfoRegistry() {
ipPodMap = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(3))
.build(CacheLoader.from(ip -> KubernetesPods.INSTANCE
- .list()
- .stream()
- .filter(it -> it.getStatus() != null)
- .filter(it -> it.getMetadata() != null)
- .filter(it -> Objects.equals(it.getStatus().getPodIP(), ip))
- .map(it -> metadataID(it.getMetadata()))
- .findFirst()
- .orElse("")));
+ .findByIP(ip)
+ .map(it -> ObjectID
+ .builder()
+ .name(it.getMetadata().getName())
+ .namespace(it.getMetadata().getNamespace())
+ .build())
+ .orElse(ObjectID.EMPTY)));
ipServiceMap = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(3))
.build(CacheLoader.from(ip -> KubernetesServices.INSTANCE
@@ -69,25 +66,28 @@ public class K8sInfoRegistry {
it.getStatus().getLoadBalancer().getIngress() != null &&
it.getStatus().getLoadBalancer().getIngress().stream()
.anyMatch(ingress -> Objects.equals(ingress.getIp(), ip))))
- .map(it -> metadataID(it.getMetadata()))
+ .map(it -> ObjectID
+ .builder()
+ .name(it.getMetadata().getName())
+ .namespace(it.getMetadata().getNamespace())
+ .build())
.findFirst()
- .orElse("")));
+ .orElse(ObjectID.EMPTY)));
podServiceMap = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(3))
- .build(CacheLoader.from(podMetadataID -> {
+ .build(CacheLoader.from(podObjectID -> {
final Optional<V1Pod> pod = KubernetesPods.INSTANCE
- .list()
- .stream()
- .filter(it -> it.getMetadata() != null)
- .filter(it -> Objects.equals(
- metadataID(it.getMetadata()),
- podMetadataID))
- .findFirst();
+ .findByObjectID(
+ ObjectID
+ .builder()
+ .name(podObjectID.name())
+ .namespace(podObjectID.namespace())
+ .build());
if (!pod.isPresent()
|| pod.get().getMetadata() == null
|| pod.get().getMetadata().getLabels() == null) {
- return "";
+ return ObjectID.EMPTY;
}
final Optional<V1Service> service = KubernetesServices.INSTANCE
@@ -103,11 +103,13 @@ public class K8sInfoRegistry {
})
.findFirst();
if (!service.isPresent()) {
- return "";
+ return ObjectID.EMPTY;
}
- return service.get().getMetadata().getName()
- + SEPARATOR
- + service.get().getMetadata().getNamespace();
+ return ObjectID
+ .builder()
+ .name(service.get().getMetadata().getName())
+ .namespace(service.get().getMetadata().getNamespace())
+ .build();
}));
}
@@ -117,17 +119,23 @@ public class K8sInfoRegistry {
@SneakyThrows
public String findServiceName(String namespace, String podName) {
- return this.podServiceMap.get(podName + SEPARATOR + namespace);
+ return this.podServiceMap.get(
+ ObjectID
+ .builder()
+ .name(podName)
+ .namespace(namespace)
+ .build())
+ .toString();
}
@SneakyThrows
public String findPodByIP(String ip) {
- return this.ipPodMap.get(ip);
+ return this.ipPodMap.get(ip).toString();
}
@SneakyThrows
public String findServiceByIP(String ip) {
- return this.ipServiceMap.get(ip);
+ return this.ipServiceMap.get(ip).toString();
}
private boolean hasIntersection(Collection<?> o, Collection<?> c) {
@@ -140,8 +148,4 @@ public class K8sInfoRegistry {
}
return true;
}
-
- String metadataID(final V1ObjectMeta metadata) {
- return metadata.getName() + SEPARATOR + metadata.getNamespace();
- }
}
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
index a8abbfc9d6..f0f7fd50a5 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -30,12 +30,13 @@ import io.kubernetes.client.openapi.models.V1ServiceSpec;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
-
+import java.util.Optional;
import io.kubernetes.client.openapi.models.V1ServiceStatus;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.kubernetes.KubernetesPods;
import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.Retag;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.junit.Before;
@@ -56,7 +57,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@Slf4j
-@PowerMockIgnore("javax.net.ssl.*")
+@PowerMockIgnore({"javax.net.ssl.*", "javax.management.*"})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(Parameterized.class)
@PrepareForTest({KubernetesPods.class, KubernetesServices.class})
@@ -266,12 +267,27 @@ public class K8sTagTest {
Mockito.mock(KubernetesPods.class)
);
- PowerMockito.when(KubernetesServices.INSTANCE, "list").thenReturn(ImmutableList.of(
+ PowerMockito.when(KubernetesServices.INSTANCE.list()).thenReturn(ImmutableList.of(
mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"),
mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2")));
- PowerMockito.when(KubernetesPods.INSTANCE, "list").thenReturn(ImmutableList.of(
+ ImmutableList.of(
+ mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"),
+ mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2"))
+ .forEach(svc ->
+ PowerMockito
+ .when(KubernetesServices.INSTANCE.findByID(ObjectID.builder().namespace(svc.getMetadata().getNamespace()).name(svc.getMetadata().getName()).build()))
+ .thenReturn(Optional.of(svc))
+ );
+ ImmutableList.of(
mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1"),
- mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2")));
+ mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2"))
+ .forEach(pod -> {
+ PowerMockito
+ .when(KubernetesPods.INSTANCE.findByIP(pod.getStatus().getPodIP()))
+ .thenReturn(Optional.of(pod));
+ PowerMockito
+ .when(KubernetesPods.INSTANCE.findByObjectID(ObjectID.builder().name(pod.getMetadata().getName()).namespace(pod.getMetadata().getNamespace()).build())).thenReturn(Optional.of(pod));
+ });
}
@Test
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
index bb862f81db..776789288f 100644
--- a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
@@ -20,13 +20,10 @@
package org.apache.skywalking.library.kubernetes;
import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import lombok.SneakyThrows;
@@ -34,7 +31,10 @@ import lombok.SneakyThrows;
public enum KubernetesPods {
INSTANCE;
- private final LoadingCache<KubernetesPods, List<V1Pod>> pods;
+ private static final String FIELD_SELECTOR_PATTERN_POD_IP = "status.podIP=%s";
+
+ private final LoadingCache<String, Optional<V1Pod>> podByIP;
+ private final LoadingCache<ObjectID, Optional<V1Pod>> podByObjectID;
@SneakyThrows
private KubernetesPods() {
@@ -43,24 +43,41 @@ public enum KubernetesPods {
final CoreV1Api coreV1Api = new CoreV1Api();
final CacheBuilder<Object, Object> cacheBuilder =
CacheBuilder.newBuilder()
- .expireAfterAccess(Duration.ofMinutes(3));
+ .expireAfterAccess(Duration.ofMinutes(5));
- pods = cacheBuilder.build(CacheLoader.from(() -> {
- try {
+ podByIP = cacheBuilder.build(new CacheLoader<String, Optional<V1Pod>>() {
+ @Override
+ public Optional<V1Pod> load(String ip) throws Exception {
return coreV1Api
.listPodForAllNamespaces(
- null, null, null, null, null,
- null, null, null, null, null)
- .getItems();
- } catch (ApiException e) {
- LoggerFactory.getLogger(getClass()).error("Failed to list Pods.", e);
- return Collections.emptyList();
+ null, null, String.format(FIELD_SELECTOR_PATTERN_POD_IP, ip),
+ null, null, null, null, null, null, null)
+ .getItems()
+ .stream()
+ .findFirst();
+ }
+ });
+
+ podByObjectID = cacheBuilder.build(new CacheLoader<ObjectID, Optional<V1Pod>>() {
+ @Override
+ public Optional<V1Pod> load(ObjectID objectID) throws Exception {
+ return Optional.ofNullable(
+ coreV1Api
+ .readNamespacedPod(
+ objectID.name(),
+ objectID.namespace(),
+ null));
}
- }));
+ });
+ }
+
+ @SneakyThrows
+ public Optional<V1Pod> findByIP(final String ip) {
+ return podByIP.get(ip);
}
@SneakyThrows
- public List<V1Pod> list() {
- return pods.get(this);
+ public Optional<V1Pod> findByObjectID(final ObjectID id) {
+ return podByObjectID.get(id);
}
}
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
index ed128d9e65..aff56f2874 100644
--- a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
@@ -22,6 +22,7 @@ package org.apache.skywalking.library.kubernetes;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -35,6 +36,7 @@ public enum KubernetesServices {
INSTANCE;
private final LoadingCache<KubernetesServices, List<V1Service>> services;
+ private final LoadingCache<ObjectID, Optional<V1Service>> serviceByID;
@SneakyThrows
private KubernetesServices() {
@@ -57,10 +59,27 @@ public enum KubernetesServices {
return Collections.emptyList();
}
}));
+
+ serviceByID = cacheBuilder.build(new CacheLoader<ObjectID, Optional<V1Service>>() {
+ @Override
+ public Optional<V1Service> load(ObjectID id) throws Exception {
+ return Optional.ofNullable(
+ coreV1Api
+ .readNamespacedService(
+ id.name(),
+ id.namespace(),
+ null));
+ }
+ });
}
@SneakyThrows
public List<V1Service> list() {
return services.get(this);
}
+
+ @SneakyThrows
+ public Optional<V1Service> findByID(final ObjectID id) {
+ return serviceByID.get(id);
+ }
}
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java
new file mode 100644
index 0000000000..a9516e5659
--- /dev/null
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+ package org.apache.skywalking.library.kubernetes;
+
+import org.apache.logging.log4j.util.Strings;
+import lombok.Builder;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Data
+@Builder
+@RequiredArgsConstructor
+@Accessors(fluent = true)
+public class ObjectID {
+ public static final ObjectID EMPTY = ObjectID.builder().build();
+
+ private final String name;
+ private final String namespace;
+
+ @Override
+ public String toString() {
+ if (this == EMPTY) {
+ return "";
+ }
+ if (Strings.isBlank(namespace)) {
+ return name;
+ }
+ return name + "." + namespace;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index 42dfdfe7d9..90e7ce96ab 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
import static java.util.Objects.isNull;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import java.time.Duration;
import java.util.ArrayList;
@@ -34,6 +33,7 @@ import org.apache.skywalking.library.kubernetes.KubernetesClient;
import org.apache.skywalking.library.kubernetes.KubernetesEndpoints;
import org.apache.skywalking.library.kubernetes.KubernetesPods;
import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
@@ -95,19 +95,13 @@ public class K8SServiceRegistry {
ipServiceMetaInfoMap = cacheBuilder.build(new CacheLoader<String, ServiceMetaInfo>() {
@Override
public ServiceMetaInfo load(String ip) throws Exception {
- final Optional<V1Pod> pod =
- KubernetesPods.INSTANCE
- .list()
- .stream()
- .filter(it -> it.getStatus() != null)
- .filter(it -> Objects.equals(ip, requireNonNull(it.getStatus()).getPodIP()))
- .findFirst();
+ final Optional<V1Pod> pod = KubernetesPods.INSTANCE.findByIP(ip);
if (!pod.isPresent()) {
log.debug("No corresponding Pod for IP: {}", ip);
return config.serviceMetaInfoFactory().unknown();
}
- final Optional<Tuple2<String, String>> /* <namespace, serviceName> */ serviceID =
+ final Optional<ObjectID> serviceID =
KubernetesEndpoints.INSTANCE
.list()
.stream()
@@ -121,7 +115,11 @@ public class K8SServiceRegistry {
.filter(subset -> subset.getAddresses() != null)
.flatMap(subset -> subset.getAddresses().stream())
.anyMatch(address -> Objects.equals(ip, address.getIp()))) {
- return metadataID(metadata);
+ return ObjectID
+ .builder()
+ .name(metadata.getName())
+ .namespace(metadata.getNamespace())
+ .build();
}
return null;
})
@@ -133,14 +131,7 @@ public class K8SServiceRegistry {
}
final Optional<V1Service> service =
- KubernetesServices.INSTANCE
- .list()
- .stream()
- .filter(it -> it.getMetadata() != null)
- .filter(it -> Objects.equals(
- metadataID(it.getMetadata()),
- serviceID.get()))
- .findFirst();
+ KubernetesServices.INSTANCE.findByID(serviceID.get());
if (!service.isPresent()) {
log.debug("No service for namespace and name: {}", serviceID.get());
return config.serviceMetaInfoFactory().unknown();