You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2019/03/12 08:36:41 UTC

[incubator-skywalking] branch master updated: Make k8s-Coordinator works in role mode. (#2347)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 0951b94  Make k8s-Coordinator works in role mode. (#2347)
0951b94 is described below

commit 0951b9434b6f043c99dab697dd6db629a6669178
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 12 16:36:36 2019 +0800

    Make k8s-Coordinator works in role mode. (#2347)
    
    * Make k8s-Coordinator back in role mode.
    
    * Start query API server after the core started.
    
    * Do lock when port has been intialized.
    
    * Rename service.
    
    * Fix text cases.
    
    * Declare ConfigService.class in core module.
    
    * Implement in an easier way.
    
    * Envoy print whole metric, now in debug.
    
    * Forcedly fix envoy timestamp issue.
---
 .../cluster-kubernetes-plugin/pom.xml              |  6 +++
 .../ClusterModuleKubernetesProvider.java           | 13 ++---
 .../plugin/kubernetes/KubernetesCoordinator.java   | 39 ++++++++++----
 .../kubernetes/KubernetesCoordinatorTest.java      | 59 ++++++++++++++++++++--
 .../skywalking/oap/server/core/CoreModule.java     |  6 +--
 .../oap/server/core/CoreModuleConfig.java          |  2 +-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../oap/server/core/config/ConfigService.java      | 37 ++++++++++++++
 .../oap/server/core/remote/client/Address.java     |  2 +-
 .../receiver/envoy/MetricServiceGRPCHandler.java   | 19 ++++++-
 10 files changed, 158 insertions(+), 28 deletions(-)

diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 2eb1592..af12c92 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -38,5 +38,11 @@
             <groupId>io.kubernetes</groupId>
             <artifactId>client-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>server-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
index f1d0d07..b7e3041 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -18,13 +18,9 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
-import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
-import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
  * Use kubernetes to manage all instances in Skywalking cluster.
@@ -34,6 +30,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 public class ClusterModuleKubernetesProvider extends ModuleProvider {
 
     private final ClusterModuleKubernetesConfig config;
+    private KubernetesCoordinator coordinator;
 
     public ClusterModuleKubernetesProvider() {
         super();
@@ -53,7 +50,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException {
-        KubernetesCoordinator coordinator = new KubernetesCoordinator(
+        coordinator = new KubernetesCoordinator(getManager(),
             new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()),
             new UidEnvSupplier(config.getUidEnvName()));
         this.registerServiceImplementation(ClusterRegister.class, coordinator);
@@ -65,7 +62,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
     }
 
     @Override public void notifyAfterCompleted() {
-
+        coordinator.start();
     }
 
     @Override public String[] requiredModules() {
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 730f4b2..0485b47 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -18,20 +18,22 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
 import org.slf4j.*;
 
 /**
- * Read collector pod info from api-server of kubernetes, then using all containerIp list to
- * construct the list of {@link RemoteInstance}.
+ * Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
+ * {@link RemoteInstance}.
  *
  * @author gaohongtao
  */
@@ -39,26 +41,33 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
 
     private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
 
+    private final ModuleDefineHolder manager;
+
     private final String uid;
 
     private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>();
 
     private final ReusableWatch<Event> watch;
 
-    private int port;
+    private volatile int port = -1;
 
-    KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+    KubernetesCoordinator(ModuleDefineHolder manager,
+        final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+        this.manager = manager;
         this.watch = watch;
         this.uid = uidSupplier.get();
         TelemetryRelatedContext.INSTANCE.setId(uid);
     }
 
-    @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
-        this.port = remoteInstance.getAddress().getPort();
+    public void start() {
         submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
             .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
 
+    @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
+        this.port = remoteInstance.getAddress().getPort();
+    }
+
     private void submitTask(final ListeningExecutorService service) {
         watch.initOrReset();
         ListenableFuture<?> watchFuture = service.submit(newWatch());
@@ -102,7 +111,19 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
     }
 
     @Override public List<RemoteInstance> queryRemoteNodes() {
-        logger.debug("Query kubernetes remote nodes: {}", cache);
-        return Lists.newArrayList(cache.values());
+        final List<RemoteInstance> list = new ArrayList<>();
+        cache.values().forEach(instance -> {
+            Address address = instance.getAddress();
+            if (port == -1) {
+                logger.debug("Query kubernetes remote, port hasn't init, try to init");
+                ConfigService service = manager.find(CoreModule.NAME).provider().getService(ConfigService.class);
+                port = service.getGRPCPort();
+                logger.debug("Query kubernetes remote, port is set at {}", port);
+            }
+            list.add(new RemoteInstance(new Address(address.getHost(), port, address.isSelf())));
+        });
+
+        logger.debug("Query kubernetes remote nodes: {}", list);
+        return list;
     }
 }
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 4fd2699..982f4eb 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -19,12 +19,18 @@
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
 import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.testing.module.*;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 public class KubernetesCoordinatorTest {
 
@@ -33,7 +39,8 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -43,7 +50,8 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -53,7 +61,8 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -63,10 +72,52 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
         assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
     }
+
+    @Test
+    public void assertModifiedInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
+    }
+
+    @Test
+    public void assertDeletedInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(1));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+    }
+
+    @Test
+    public void assertErrorInReceiverRole() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+        coordinator.start();
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+    }
+
+    public ModuleDefineHolder getManager() {
+        ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting();
+        ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+        moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine);
+        CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class);
+        when(config.getGRPCHost()).thenReturn("127.0.0.1");
+        when(config.getGRPCPort()).thenReturn(8454);
+        coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
+        return moduleManagerTesting;
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 8eb7302..feaa10f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -28,9 +28,8 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
 import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
  * @author peng-yongsheng
@@ -45,6 +44,7 @@ public class CoreModule extends ModuleDefine {
 
     @Override public Class[] services() {
         List<Class> classes = new ArrayList<>();
+        classes.add(ConfigService.class);
         classes.add(DownsamplingConfigService.class);
         classes.add(IComponentLibraryCatalogService.class);
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 35e3b49..31b59cd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
  */
 @Getter
 public class CoreModuleConfig extends ModuleConfig {
-    @Setter private String role;
+    @Setter private String role = "Mixed";
     @Setter private String nameSpace;
     @Setter private String restHost;
     @Setter private int restPort;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 52063c0..0cfc956 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider {
         jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
         jettyServer.initialize();
 
+        this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
         this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
 
         this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
@@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) {
+        if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
             RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
             this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
new file mode 100644
index 0000000..36cf031
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.config;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+@Getter
+public class ConfigService implements Service {
+    private String gRPCHost;
+    private int gRPCPort;
+
+    public ConfigService(CoreModuleConfig moduleConfig) {
+        this.gRPCHost = moduleConfig.getGRPCHost();
+        this.gRPCPort = moduleConfig.getGRPCPort();
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 4e5d0ce..00c36cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.Const;
 public class Address implements Comparable<Address> {
     private final String host;
     private final int port;
-    @Setter private boolean isSelf;
+    @Setter private volatile boolean isSelf;
 
     public Address(String host, int port, boolean isSelf) {
         this.host = host;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
index 9f9c82e..6ade680 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
@@ -66,10 +66,13 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
             private int serviceInstanceId = Const.NONE;
 
             @Override public void onNext(StreamMetricsMessage message) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Received msg {}", message);
+                }
+
                 if (isFirst) {
                     isFirst = false;
                     StreamMetricsMessage.Identifier identifier = message.getIdentifier();
-                    logger.debug("Received identifier msg {}", identifier);
                     Node node = identifier.getNode();
                     if (node != null) {
                         String nodeId = node.getId();
@@ -110,6 +113,20 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
                                         timestamp = metric.getTimestampMs();
                                         value = metric.getGauge().getValue();
 
+                                        if (timestamp > 1000000000000000000L) {
+                                            /**
+                                             * Several versions of envoy in istio.deps send timestamp in nanoseconds,
+                                             * instead of milliseconds(protocol says).
+                                             *
+                                             * Sadly, but have to fix it forcedly.
+                                             *
+                                             * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds.
+                                             *
+                                             * This should be removed in the future.
+                                             */
+                                            timestamp /= 1_000_000;
+                                        }
+
                                         EnvoyInstanceMetric metricSource = new EnvoyInstanceMetric();
                                         metricSource.setServiceId(serviceId);
                                         metricSource.setServiceName(serviceName);