You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/03/30 00:48:45 UTC

[skywalking] branch master updated: Add arg `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels (#6650)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 523944a  Add  arg `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels (#6650)
523944a is described below

commit 523944a59f7e6f8a3ed6f92b50401f733ffad6a5
Author: wankai123 <wa...@foxmail.com>
AuthorDate: Tue Mar 30 08:48:22 2021 +0800

    Add  arg `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels (#6650)
    
    * Add `namespace` to func `retagByK8sMeta`, rebuild the relationship between pod and service by labels
    
    * add param check
---
 docs/en/concepts-and-designs/mal.md                |   8 +-
 .../oap/meter/analyzer/dsl/SampleFamily.java       |   5 +-
 .../meter/analyzer/dsl/tagOpt/K8sRetagType.java    |  11 +-
 .../oap/meter/analyzer/dsl/tagOpt/Retag.java       |   2 +-
 .../oap/meter/analyzer/k8s/K8sInfoRegistry.java    | 144 +++++++++++----------
 .../oap/meter/analyzer/dsl/K8sTagTest.java         | 116 +++++++++++++----
 6 files changed, 182 insertions(+), 104 deletions(-)

diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index 4a3d7fb..9a478d5 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -65,22 +65,22 @@ MAL supports using the metadata of k8s to manipulate the tags and their values.
 This feature requires OAP Server to have the authority to access the K8s's `API Server`.
 
 ##### retagByK8sMeta
-`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName)`. Add a new tag to the sample family based on an existing label's value. Provide several internal converting types, including
+`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName, namespaceLabelName)`. Add a new tag to the sample family based on an existing label's value. Provide several internal converting types, including
 - K8sRetagType.Pod2Service  
 
 Add a tag to the sample by using `service` as the key, `$serviceName.$namespace` as the value, by the given value of the tag key, which represents the name of a pod.
 
 For example:
 ```
-container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh} 2
+container_cpu_usage_seconds_total{namespace=default, container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh} 2
 ```
 Expression:
 ```
-container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')
+container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')
 ```
 Output:
 ```
-container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
+container_cpu_usage_seconds_total{namespace=default, container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
 ```
 
 ### Binary operators
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index b4c82d0..efe2dca 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -312,15 +312,16 @@ public class SampleFamily {
     }
 
     /* k8s retags*/
-    public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, String existingLabelName) {
+    public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, String existingLabelName, String namespaceLabelName) {
         Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
         Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceLabelName));
         ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = true);
         if (this == EMPTY) {
             return EMPTY;
         }
 
-        return SampleFamily.build(this.context, type.execute(samples, newLabelName, existingLabelName));
+        return SampleFamily.build(this.context, type.execute(samples, newLabelName, existingLabelName, namespaceLabelName));
     }
 
     public SampleFamily histogram() {
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
index 8647831..5b179f6 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
@@ -30,12 +30,15 @@ public enum K8sRetagType implements Retag {
 
     Pod2Service {
         @Override
-        public Sample[] execute(final Sample[] ss, final String newLabelName, final String existingLabelName) {
+        public Sample[] execute(final Sample[] ss,
+                                final String newLabelName,
+                                final String existingLabelName,
+                                final String namespaceLabelName) {
             Sample[] samples = Arrays.stream(ss).map(sample -> {
                 String podName = sample.getLabels().get(existingLabelName);
-
-                if (!Strings.isNullOrEmpty(podName)) {
-                    String serviceName = K8sInfoRegistry.getInstance().findServiceName(podName);
+                String namespace = sample.getLabels().get(namespaceLabelName);
+                if (!Strings.isNullOrEmpty(podName) && !Strings.isNullOrEmpty(namespace)) {
+                    String serviceName = K8sInfoRegistry.getInstance().findServiceName(namespace, podName);
                     if (!Strings.isNullOrEmpty(serviceName)) {
                         Map<String, String> labels = Maps.newHashMap(sample.getLabels());
                         labels.put(newLabelName, serviceName);
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
index eda574e..e02a971 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
@@ -21,5 +21,5 @@ package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 
 public interface Retag {
-    Sample[] execute(Sample[] ss, String newLabelName, String existingLabelName);
+    Sample[] execute(Sample[] ss, String newLabelName, String existingLabelName, String namespaceLabelName);
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
index 694cd31..aa49b05 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -18,19 +18,21 @@
 
 package org.apache.skywalking.oap.meter.analyzer.k8s;
 
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.kubernetes.client.informer.ResourceEventHandler;
 import io.kubernetes.client.informer.SharedInformerFactory;
 import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1Endpoints;
-import io.kubernetes.client.openapi.models.V1EndpointsList;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceList;
 import io.kubernetes.client.util.Config;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.models.V1Pod;
+import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,7 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.isNull;
 import static java.util.Optional.ofNullable;
 
@@ -48,10 +49,11 @@ public class K8sInfoRegistry {
 
     private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
-    private final Map<String/* ip */, V1Pod> ipPodMap = new ConcurrentHashMap<>();
-    private final Map<String/* ip */, String/* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
-    private final Map<String/* podName */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
+    private final Map<String/* podName.namespace */, V1Pod> namePodMap = new ConcurrentHashMap<>();
+    protected final Map<String/* serviceName.namespace  */, V1Service> nameServiceMap = new ConcurrentHashMap<>();
+    private final Map<String/* podName.namespace */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
     private ExecutorService executor;
+    private static final String SEPARATOR = ".";
 
     public static K8sInfoRegistry getInstance() {
         return INSTANCE;
@@ -79,16 +81,15 @@ public class K8sInfoRegistry {
 
             final CoreV1Api coreV1Api = new CoreV1Api();
             final SharedInformerFactory factory = new SharedInformerFactory(executor);
-
-            listenEndpointsEvents(coreV1Api, factory);
+            listenServiceEvents(coreV1Api, factory);
             listenPodEvents(coreV1Api, factory);
             factory.startAllRegisteredInformers();
         }
     }
 
-    private void listenEndpointsEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
+    private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
         factory.sharedIndexInformerFor(
-            params -> coreV1Api.listEndpointsForAllNamespacesCall(
+            params -> coreV1Api.listServiceForAllNamespacesCall(
                 null,
                 null,
                 null,
@@ -100,22 +101,22 @@ public class K8sInfoRegistry {
                 params.watch,
                 null
             ),
-            V1Endpoints.class,
-            V1EndpointsList.class
-        ).addEventHandler(new ResourceEventHandler<V1Endpoints>() {
+            V1Service.class,
+            V1ServiceList.class
+        ).addEventHandler(new ResourceEventHandler<V1Service>() {
             @Override
-            public void onAdd(final V1Endpoints endpoints) {
-                addEndpoints(endpoints);
+            public void onAdd(final V1Service service) {
+                addService(service);
             }
 
             @Override
-            public void onUpdate(final V1Endpoints oldEndpoints, final V1Endpoints newEndpoints) {
-                addEndpoints(newEndpoints);
+            public void onUpdate(final V1Service oldService, final V1Service newService) {
+                addService(newService);
             }
 
             @Override
-            public void onDelete(final V1Endpoints endpoints, final boolean deletedFinalStateUnknown) {
-                removeEndpoints(endpoints);
+            public void onDelete(final V1Service service, final boolean deletedFinalStateUnknown) {
+                removeService(service);
             }
         });
     }
@@ -154,71 +155,76 @@ public class K8sInfoRegistry {
         });
     }
 
-    private void addPod(final V1Pod pod) {
-        ofNullable(pod.getStatus()).ifPresent(
-            status -> ofNullable(status.getPodIP()).ifPresent(
-                ip -> ipPodMap.put(ip, pod))
+    protected void addService(final V1Service service) {
+        ofNullable(service.getMetadata()).ifPresent(
+            metadata -> nameServiceMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), service)
         );
-
         recompose();
     }
 
-    private void removePod(final V1Pod pod) {
-        ofNullable(pod.getStatus()).ifPresent(
-            status -> ipPodMap.remove(status.getPodIP())
-        );
-        ofNullable(pod.getMetadata()).ifPresent(
-            metadata -> podServiceMap.remove(pod.getMetadata().getName())
+    protected void removeService(final V1Service service) {
+        ofNullable(service.getMetadata()).ifPresent(
+            metadata -> nameServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace())
         );
+        recompose();
     }
 
-    private void addEndpoints(final V1Endpoints endpoints) {
-        V1ObjectMeta endpointsMetadata = endpoints.getMetadata();
-        if (isNull(endpointsMetadata)) {
-            log.error("Endpoints metadata is null: {}", endpoints);
-            return;
-        }
-
-        final String namespace = endpointsMetadata.getNamespace();
-        final String name = endpointsMetadata.getName();
-
-        ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
-            subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
-                address -> ipServiceMap.put(address.getIp(), name + "." + namespace)
-            ))
-        ));
+    protected void addPod(final V1Pod pod) {
+        ofNullable(pod.getMetadata()).ifPresent(
+            metadata -> namePodMap.put(metadata.getName() + SEPARATOR + metadata.getNamespace(), pod));
 
         recompose();
     }
 
-    private void removeEndpoints(final V1Endpoints endpoints) {
-        ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
-            subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
-                address -> ipServiceMap.remove(address.getIp())
-            ))
-        ));
-        recompose();
+    protected void removePod(final V1Pod pod) {
+        ofNullable(pod.getMetadata()).ifPresent(
+            metadata -> namePodMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
+
+        ofNullable(pod.getMetadata()).ifPresent(
+            metadata -> podServiceMap.remove(metadata.getName() + SEPARATOR + metadata.getNamespace()));
     }
 
     private void recompose() {
-        ipPodMap.forEach((ip, pod) -> {
-            final String namespaceService = ipServiceMap.get(ip);
-            if (isNullOrEmpty(namespaceService)) {
-                podServiceMap.remove(ip);
-                return;
-            }
-
-            final V1ObjectMeta podMetadata = pod.getMetadata();
-            if (isNull(podMetadata)) {
-                log.warn("Pod metadata is null, {}", pod);
-                return;
-            }
-
-            podServiceMap.put(pod.getMetadata().getName(), namespaceService);
+        namePodMap.forEach((podName, pod) -> {
+            nameServiceMap.forEach((serviceName, service) -> {
+                if (isNull(pod.getMetadata()) || isNull(service.getMetadata()) || isNull(service.getSpec())) {
+                    return;
+                }
+
+                Map<String, String> selector = service.getSpec().getSelector();
+                Map<String, String> labels = pod.getMetadata().getLabels();
+
+                if (isNull(labels) || isNull(selector)) {
+                    return;
+                }
+
+                String podNamespace = pod.getMetadata().getNamespace();
+                String serviceNamespace = service.getMetadata().getNamespace();
+
+                if (Strings.isNullOrEmpty(podNamespace) || Strings.isNullOrEmpty(
+                    serviceNamespace) || !podNamespace.equals(serviceNamespace)) {
+                    return;
+                }
+
+                if (hasIntersection(selector.entrySet(), labels.entrySet())) {
+                    podServiceMap.put(podName, serviceName);
+                }
+            });
         });
     }
 
-    public String findServiceName(String podName) {
-        return this.podServiceMap.get(podName);
+    public String findServiceName(String namespace, String podName) {
+        return this.podServiceMap.get(podName + SEPARATOR + namespace);
+    }
+
+    private boolean hasIntersection(Collection<?> o, Collection<?> c) {
+        Objects.requireNonNull(o);
+        Objects.requireNonNull(c);
+        for (final Object value : o) {
+            if (c.contains(value)) {
+                return true;
+            }
+        }
+        return false;
     }
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
index 1996287..fbdf1ad 100644
--- a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
+++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -19,8 +19,14 @@
 package org.apache.skywalking.oap.meter.analyzer.dsl;
 
 import com.google.common.collect.ImmutableMap;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
 import org.junit.Before;
@@ -28,13 +34,13 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.reflect.Whitebox;
 
 import static com.google.common.collect.ImmutableMap.of;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
 
 @Slf4j
 @RunWith(Parameterized.class)
@@ -63,24 +69,28 @@ public class K8sTagTest {
                 of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
-                              of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh"))
+                              of(
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-mbczh"
+                              ))
                           .value(2)
                           .build(),
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx"
                               ))
                           .value(1)
                           .build()
                 ).build()),
-                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
                 Result.success(SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh",
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-mbczh",
                                   "service", "nginx-service.default"
                               ))
                           .value(2)
@@ -88,7 +98,7 @@ public class K8sTagTest {
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx",
                                   "service", "kube-state-metrics.kube-system"
                               ))
@@ -102,31 +112,35 @@ public class K8sTagTest {
                 of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
-                              of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"))
+                              of(
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-no-pod"
+                              ))
                           .value(2)
                           .build(),
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx"
                               ))
                           .value(1)
                           .build()
                 ).build()),
-                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
                 Result.success(SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-no-pod"
                               ))
                           .value(2)
                           .build(),
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx",
                                   "service", "kube-state-metrics.kube-system"
                               ))
@@ -140,31 +154,35 @@ public class K8sTagTest {
                 of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
-                              of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"))
+                              of(
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-no-service"
+                              ))
                           .value(2)
                           .build(),
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx"
                               ))
                           .value(1)
                           .build()
                 ).build()),
-                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod' , 'namespace')",
                 Result.success(SampleFamilyBuilder.newBuilder(
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"
+                                  "namespace", "default", "container", "my-nginx", "cpu", "total", "pod",
+                                  "my-nginx-5dc4865748-no-service"
                               ))
                           .value(2)
                           .build(),
                     Sample.builder()
                           .labels(
                               of(
-                                  "container", "kube-state-metrics", "cpu", "total", "pod",
+                                  "namespace", "kube-system", "container", "kube-state-metrics", "cpu", "total", "pod",
                                   "kube-state-metrics-6f979fd498-z7xwx",
                                   "service", "kube-state-metrics.kube-system"
                               ))
@@ -176,19 +194,44 @@ public class K8sTagTest {
             });
     }
 
+    @SneakyThrows
     @Before
     public void setup() {
         Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
                                   Mockito.spy(K8sInfoRegistry.getInstance())
         );
-        when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-mbczh")).thenReturn(
-            "nginx-service.default");
-        when(K8sInfoRegistry.getInstance().findServiceName("kube-state-metrics-6f979fd498-z7xwx")).thenReturn(
-            "kube-state-metrics.kube-system");
-        when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-pod")).thenReturn(
-            null);
-        when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-service")).thenReturn(
-            null);
+
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+                    .thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addService",
+            mockService("kube-state-metrics", "kube-system", of("run", "kube-state-metrics"))
+        ).thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addPod",
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+        ).thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addPod",
+            mockPod("kube-state-metrics-6f979fd498-z7xwx", "kube-system", of("run", "kube-state-metrics"))
+        ).thenCallRealMethod();
+
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "removeService", mockService("nginx-service", "default", of("run", "nginx")))
+                    .thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "removePod",
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+        ).thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addService", mockService("nginx-service", "default", of("run", "nginx")))
+                    .thenCallRealMethod();
+        PowerMockito.when(
+            K8sInfoRegistry.getInstance(), "addPod",
+            mockPod("my-nginx-5dc4865748-mbczh", "default", of("run", "nginx"))
+        ).thenCallRealMethod();
+
     }
 
     @Test
@@ -209,4 +252,29 @@ public class K8sTagTest {
         }
         assertThat(r, is(want));
     }
+
+    private V1Service mockService(String name, String namespace, Map<String, String> selector) {
+        V1Service service = new V1Service();
+        V1ObjectMeta serviceMeta = new V1ObjectMeta();
+        V1ServiceSpec v1ServiceSpec = new V1ServiceSpec();
+
+        serviceMeta.setName(name);
+        serviceMeta.setNamespace(namespace);
+        service.setMetadata(serviceMeta);
+        v1ServiceSpec.setSelector(selector);
+        service.setSpec(v1ServiceSpec);
+
+        return service;
+    }
+
+    private V1Pod mockPod(String name, String namespace, Map<String, String> labels) {
+        V1Pod v1Pod = new V1Pod();
+        V1ObjectMeta podMeta = new V1ObjectMeta();
+        podMeta.setName(name);
+        podMeta.setNamespace(namespace);
+        podMeta.setLabels(labels);
+        v1Pod.setMetadata(podMeta);
+
+        return v1Pod;
+    }
 }
\ No newline at end of file