You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/18 05:02:22 UTC

[skywalking] branch master updated: Improve Kubernetes Pod lister with field selector (#9795)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5abe6ceb1f Improve Kubernetes Pod lister with field selector (#9795)
5abe6ceb1f is described below

commit 5abe6ceb1ff0938be917f8b440c5eb3590cf31db
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Tue Oct 18 13:02:07 2022 +0800

    Improve Kubernetes Pod lister with field selector (#9795)
---
 .../oap/meter/analyzer/k8s/K8sInfoRegistry.java    | 76 ++++++++++++----------
 .../oap/meter/analyzer/dsl/K8sTagTest.java         | 26 ++++++--
 .../library/kubernetes/KubernetesPods.java         | 51 ++++++++++-----
 .../library/kubernetes/KubernetesServices.java     | 19 ++++++
 .../skywalking/library/kubernetes/ObjectID.java    | 47 +++++++++++++
 .../receiver/envoy/als/k8s/K8SServiceRegistry.java | 27 +++-----
 6 files changed, 170 insertions(+), 76 deletions(-)

diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
index 483b4a9a58..a82e185181 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -26,34 +26,31 @@ import java.util.Objects;
 import java.util.Optional;
 import org.apache.skywalking.library.kubernetes.KubernetesPods;
 import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.openapi.models.V1Service;
 import lombok.SneakyThrows;
 
 public class K8sInfoRegistry {
-
     private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
-    private final LoadingCache<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap;
-    private final LoadingCache<String/* podIP */, String /* podName.namespace */> ipPodMap;
-    private final LoadingCache<String/* serviceIP */, String /* serviceName.namespace */> ipServiceMap;
-    private static final String SEPARATOR = ".";
+    private final LoadingCache<ObjectID /* Pod */, ObjectID /* Service */> podServiceMap;
+    private final LoadingCache<String/* podIP */, ObjectID /* Pod */> ipPodMap;
+    private final LoadingCache<String/* serviceIP */, ObjectID /* Service */> ipServiceMap;
 
     private K8sInfoRegistry() {
         ipPodMap = CacheBuilder.newBuilder()
             .expireAfterWrite(Duration.ofMinutes(3))
             .build(CacheLoader.from(ip -> KubernetesPods.INSTANCE
-                .list()
-                .stream()
-                .filter(it -> it.getStatus() != null)
-                .filter(it -> it.getMetadata() != null)
-                .filter(it -> Objects.equals(it.getStatus().getPodIP(), ip))
-                .map(it -> metadataID(it.getMetadata()))
-                .findFirst()
-                .orElse("")));
+                .findByIP(ip)
+                .map(it -> ObjectID
+                    .builder()
+                    .name(it.getMetadata().getName())
+                    .namespace(it.getMetadata().getNamespace())
+                    .build())
+                .orElse(ObjectID.EMPTY)));
         ipServiceMap = CacheBuilder.newBuilder()
             .expireAfterWrite(Duration.ofMinutes(3))
             .build(CacheLoader.from(ip -> KubernetesServices.INSTANCE
@@ -69,25 +66,28 @@ public class K8sInfoRegistry {
                         it.getStatus().getLoadBalancer().getIngress() != null &&
                         it.getStatus().getLoadBalancer().getIngress().stream()
                             .anyMatch(ingress -> Objects.equals(ingress.getIp(), ip))))
-                .map(it -> metadataID(it.getMetadata()))
+                .map(it -> ObjectID
+                    .builder()
+                    .name(it.getMetadata().getName())
+                    .namespace(it.getMetadata().getNamespace())
+                    .build())
                 .findFirst()
-                .orElse("")));
+                .orElse(ObjectID.EMPTY)));
         podServiceMap = CacheBuilder.newBuilder()
             .expireAfterWrite(Duration.ofMinutes(3))
-            .build(CacheLoader.from(podMetadataID -> {
+            .build(CacheLoader.from(podObjectID -> {
                 final Optional<V1Pod> pod = KubernetesPods.INSTANCE
-                    .list()
-                    .stream()
-                    .filter(it -> it.getMetadata() != null)
-                    .filter(it -> Objects.equals(
-                        metadataID(it.getMetadata()),
-                        podMetadataID))
-                    .findFirst();
+                    .findByObjectID(
+                        ObjectID
+                            .builder()
+                            .name(podObjectID.name())
+                            .namespace(podObjectID.namespace())
+                            .build());
 
                 if (!pod.isPresent()
                     || pod.get().getMetadata() == null
                     || pod.get().getMetadata().getLabels() == null) {
-                    return "";
+                    return ObjectID.EMPTY;
                 }
 
                 final Optional<V1Service> service = KubernetesServices.INSTANCE
@@ -103,11 +103,13 @@ public class K8sInfoRegistry {
                     })
                     .findFirst();
                 if (!service.isPresent()) {
-                    return "";
+                    return ObjectID.EMPTY;
                 }
-                return service.get().getMetadata().getName()
-                    + SEPARATOR
-                    + service.get().getMetadata().getNamespace();
+                return ObjectID
+                    .builder()
+                    .name(service.get().getMetadata().getName())
+                    .namespace(service.get().getMetadata().getNamespace())
+                    .build();
             }));
     }
 
@@ -117,17 +119,23 @@ public class K8sInfoRegistry {
 
     @SneakyThrows
     public String findServiceName(String namespace, String podName) {
-        return this.podServiceMap.get(podName + SEPARATOR + namespace);
+        return this.podServiceMap.get(
+            ObjectID
+                .builder()
+                .name(podName)
+                .namespace(namespace)
+                .build())
+            .toString();
     }
 
     @SneakyThrows
     public String findPodByIP(String ip) {
-        return this.ipPodMap.get(ip);
+        return this.ipPodMap.get(ip).toString();
     }
 
     @SneakyThrows
     public String findServiceByIP(String ip) {
-        return this.ipServiceMap.get(ip);
+        return this.ipServiceMap.get(ip).toString();
     }
 
     private boolean hasIntersection(Collection<?> o, Collection<?> c) {
@@ -140,8 +148,4 @@ public class K8sInfoRegistry {
         }
         return true;
     }
-
-    String metadataID(final V1ObjectMeta metadata) {
-        return metadata.getName() + SEPARATOR + metadata.getNamespace();
-    }
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
index a8abbfc9d6..f0f7fd50a5 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -30,12 +30,13 @@ import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
-
+import java.util.Optional;
 import io.kubernetes.client.openapi.models.V1ServiceStatus;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.kubernetes.KubernetesPods;
 import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
 import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.Retag;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.junit.Before;
@@ -56,7 +57,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 @Slf4j
-@PowerMockIgnore("javax.net.ssl.*")
+@PowerMockIgnore({"javax.net.ssl.*", "javax.management.*"})
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(Parameterized.class)
 @PrepareForTest({KubernetesPods.class, KubernetesServices.class})
@@ -266,12 +267,27 @@ public class K8sTagTest {
                                   Mockito.mock(KubernetesPods.class)
         );
 
-        PowerMockito.when(KubernetesServices.INSTANCE, "list").thenReturn(ImmutableList.of(
+        PowerMockito.when(KubernetesServices.INSTANCE.list()).thenReturn(ImmutableList.of(
                 mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"),
                 mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2")));
-        PowerMockito.when(KubernetesPods.INSTANCE, "list").thenReturn(ImmutableList.of(
+        ImmutableList.of(
+            mockService("nginx-service", "default", of("run", "nginx"), "2.2.2.1"),
+            mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"), "2.2.2.2"))
+            .forEach(svc ->
+                PowerMockito
+                .when(KubernetesServices.INSTANCE.findByID(ObjectID.builder().namespace(svc.getMetadata().getNamespace()).name(svc.getMetadata().getName()).build()))
+                .thenReturn(Optional.of(svc))
+            );
+        ImmutableList.of(
             mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"), "1.1.1.1"),
-            mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2")));
+            mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"), "1.1.1.2"))
+            .forEach(pod -> {
+                PowerMockito
+                .when(KubernetesPods.INSTANCE.findByIP(pod.getStatus().getPodIP()))
+                .thenReturn(Optional.of(pod));
+                PowerMockito
+                .when(KubernetesPods.INSTANCE.findByObjectID(ObjectID.builder().name(pod.getMetadata().getName()).namespace(pod.getMetadata().getNamespace()).build())).thenReturn(Optional.of(pod));
+        });
     }
 
     @Test
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
index bb862f81db..776789288f 100644
--- a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java
@@ -20,13 +20,10 @@
 package org.apache.skywalking.library.kubernetes;
 
 import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1Pod;
 import lombok.SneakyThrows;
@@ -34,7 +31,10 @@ import lombok.SneakyThrows;
 public enum KubernetesPods {
     INSTANCE;
 
-    private final LoadingCache<KubernetesPods, List<V1Pod>> pods;
+    private static final String FIELD_SELECTOR_PATTERN_POD_IP = "status.podIP=%s";
+
+    private final LoadingCache<String, Optional<V1Pod>> podByIP;
+    private final LoadingCache<ObjectID, Optional<V1Pod>> podByObjectID;
 
     @SneakyThrows
     private KubernetesPods() {
@@ -43,24 +43,41 @@ public enum KubernetesPods {
         final CoreV1Api coreV1Api = new CoreV1Api();
         final CacheBuilder<Object, Object> cacheBuilder =
             CacheBuilder.newBuilder()
-                .expireAfterAccess(Duration.ofMinutes(3));
+                .expireAfterAccess(Duration.ofMinutes(5));
 
-        pods = cacheBuilder.build(CacheLoader.from(() -> {
-            try {
+        podByIP = cacheBuilder.build(new CacheLoader<String, Optional<V1Pod>>() {
+            @Override
+            public Optional<V1Pod> load(String ip) throws Exception {
                 return coreV1Api
                     .listPodForAllNamespaces(
-                        null, null, null, null, null,
-                        null, null, null, null, null)
-                    .getItems();
-            } catch (ApiException e) {
-                LoggerFactory.getLogger(getClass()).error("Failed to list Pods.", e);
-                return Collections.emptyList();
+                        null, null, String.format(FIELD_SELECTOR_PATTERN_POD_IP, ip),
+                        null, null, null, null, null, null, null)
+                    .getItems()
+                    .stream()
+                    .findFirst();
+            }
+        });
+
+        podByObjectID = cacheBuilder.build(new CacheLoader<ObjectID, Optional<V1Pod>>() {
+            @Override
+            public Optional<V1Pod> load(ObjectID objectID) throws Exception {
+                return Optional.ofNullable(
+                    coreV1Api
+                        .readNamespacedPod(
+                            objectID.name(),
+                            objectID.namespace(),
+                            null));
             }
-        }));
+        });
+    }
+
+    @SneakyThrows
+    public Optional<V1Pod> findByIP(final String ip) {
+        return podByIP.get(ip);
     }
 
     @SneakyThrows
-    public List<V1Pod> list() {
-        return pods.get(this);
+    public Optional<V1Pod> findByObjectID(final ObjectID id) {
+        return podByObjectID.get(id);
     }
 }
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
index ed128d9e65..aff56f2874 100644
--- a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java
@@ -22,6 +22,7 @@ package org.apache.skywalking.library.kubernetes;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import org.slf4j.LoggerFactory;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -35,6 +36,7 @@ public enum KubernetesServices {
     INSTANCE;
 
     private final LoadingCache<KubernetesServices, List<V1Service>> services;
+    private final LoadingCache<ObjectID, Optional<V1Service>> serviceByID;
 
     @SneakyThrows
     private KubernetesServices() {
@@ -57,10 +59,27 @@ public enum KubernetesServices {
                 return Collections.emptyList();
             }
         }));
+
+        serviceByID = cacheBuilder.build(new CacheLoader<ObjectID, Optional<V1Service>>() {
+            @Override
+            public Optional<V1Service> load(ObjectID id) throws Exception {
+                return Optional.ofNullable(
+                    coreV1Api
+                        .readNamespacedService(
+                            id.name(),
+                            id.namespace(),
+                            null));
+            }
+        });
     }
 
     @SneakyThrows
     public List<V1Service> list() {
         return services.get(this);
     }
+
+    @SneakyThrows
+    public Optional<V1Service> findByID(final ObjectID id) {
+        return serviceByID.get(id);
+    }
 }
diff --git a/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java
new file mode 100644
index 0000000000..a9516e5659
--- /dev/null
+++ b/oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/ObjectID.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+ package org.apache.skywalking.library.kubernetes;
+
+import org.apache.logging.log4j.util.Strings;
+import lombok.Builder;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Data
+@Builder
+@RequiredArgsConstructor
+@Accessors(fluent = true)
+public class ObjectID {
+    public static final ObjectID EMPTY = ObjectID.builder().build();
+
+    private final String name;
+    private final String namespace;
+
+    @Override
+    public String toString() {
+        if (this == EMPTY) {
+            return "";
+        }
+        if (Strings.isBlank(namespace)) {
+            return name;
+        }
+        return name + "." + namespace;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
index 42dfdfe7d9..90e7ce96ab 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
 
 import static java.util.Objects.isNull;
-import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -34,6 +33,7 @@ import org.apache.skywalking.library.kubernetes.KubernetesClient;
 import org.apache.skywalking.library.kubernetes.KubernetesEndpoints;
 import org.apache.skywalking.library.kubernetes.KubernetesPods;
 import org.apache.skywalking.library.kubernetes.KubernetesServices;
+import org.apache.skywalking.library.kubernetes.ObjectID;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
@@ -95,19 +95,13 @@ public class K8SServiceRegistry {
         ipServiceMetaInfoMap = cacheBuilder.build(new CacheLoader<String, ServiceMetaInfo>() {
             @Override
             public ServiceMetaInfo load(String ip) throws Exception {
-                final Optional<V1Pod> pod =
-                    KubernetesPods.INSTANCE
-                        .list()
-                        .stream()
-                        .filter(it -> it.getStatus() != null)
-                        .filter(it -> Objects.equals(ip, requireNonNull(it.getStatus()).getPodIP()))
-                        .findFirst();
+                final Optional<V1Pod> pod = KubernetesPods.INSTANCE.findByIP(ip);
                 if (!pod.isPresent()) {
                     log.debug("No corresponding Pod for IP: {}", ip);
                     return config.serviceMetaInfoFactory().unknown();
                 }
 
-                final Optional<Tuple2<String, String>> /* <namespace, serviceName> */ serviceID =
+                final Optional<ObjectID> serviceID =
                     KubernetesEndpoints.INSTANCE
                         .list()
                         .stream()
@@ -121,7 +115,11 @@ public class K8SServiceRegistry {
                                 .filter(subset -> subset.getAddresses() != null)
                                 .flatMap(subset -> subset.getAddresses().stream())
                                 .anyMatch(address -> Objects.equals(ip, address.getIp()))) {
-                                return metadataID(metadata);
+                                return ObjectID
+                                    .builder()
+                                    .name(metadata.getName())
+                                    .namespace(metadata.getNamespace())
+                                    .build();
                             }
                             return null;
                         })
@@ -133,14 +131,7 @@ public class K8SServiceRegistry {
                 }
 
                 final Optional<V1Service> service =
-                    KubernetesServices.INSTANCE
-                        .list()
-                        .stream()
-                        .filter(it -> it.getMetadata() != null)
-                        .filter(it -> Objects.equals(
-                            metadataID(it.getMetadata()),
-                            serviceID.get()))
-                        .findFirst();
+                    KubernetesServices.INSTANCE.findByID(serviceID.get());
                 if (!service.isPresent()) {
                     log.debug("No service for namespace and name: {}", serviceID.get());
                     return config.serviceMetaInfoFactory().unknown();