You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2019/03/12 08:36:41 UTC
[incubator-skywalking] branch master updated: Make k8s-Coordinator
works in role mode. (#2347)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 0951b94 Make k8s-Coordinator works in role mode. (#2347)
0951b94 is described below
commit 0951b9434b6f043c99dab697dd6db629a6669178
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Mar 12 16:36:36 2019 +0800
Make k8s-Coordinator works in role mode. (#2347)
* Make k8s-Coordinator back in role mode.
* Start query API server after the core started.
* Do lock when port has been intialized.
* Rename service.
* Fix text cases.
* Declare ConfigService.class in core module.
* Implement in an easier way.
* Envoy print whole metric, now in debug.
* Forcedly fix envoy timestamp issue.
---
.../cluster-kubernetes-plugin/pom.xml | 6 +++
.../ClusterModuleKubernetesProvider.java | 13 ++---
.../plugin/kubernetes/KubernetesCoordinator.java | 39 ++++++++++----
.../kubernetes/KubernetesCoordinatorTest.java | 59 ++++++++++++++++++++--
.../skywalking/oap/server/core/CoreModule.java | 6 +--
.../oap/server/core/CoreModuleConfig.java | 2 +-
.../oap/server/core/CoreModuleProvider.java | 3 +-
.../oap/server/core/config/ConfigService.java | 37 ++++++++++++++
.../oap/server/core/remote/client/Address.java | 2 +-
.../receiver/envoy/MetricServiceGRPCHandler.java | 19 ++++++-
10 files changed, 158 insertions(+), 28 deletions(-)
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 2eb1592..af12c92 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -38,5 +38,11 @@
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
index f1d0d07..b7e3041 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
-import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Use kubernetes to manage all instances in Skywalking cluster.
@@ -34,6 +30,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class ClusterModuleKubernetesProvider extends ModuleProvider {
private final ClusterModuleKubernetesConfig config;
+ private KubernetesCoordinator coordinator;
public ClusterModuleKubernetesProvider() {
super();
@@ -53,7 +50,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException {
- KubernetesCoordinator coordinator = new KubernetesCoordinator(
+ coordinator = new KubernetesCoordinator(getManager(),
new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()),
new UidEnvSupplier(config.getUidEnvName()));
this.registerServiceImplementation(ClusterRegister.class, coordinator);
@@ -65,7 +62,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
}
@Override public void notifyAfterCompleted() {
-
+ coordinator.start();
}
@Override public String[] requiredModules() {
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 730f4b2..0485b47 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -18,20 +18,22 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.slf4j.*;
/**
- * Read collector pod info from api-server of kubernetes, then using all containerIp list to
- * construct the list of {@link RemoteInstance}.
+ * Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
+ * {@link RemoteInstance}.
*
* @author gaohongtao
*/
@@ -39,26 +41,33 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
+ private final ModuleDefineHolder manager;
+
private final String uid;
private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>();
private final ReusableWatch<Event> watch;
- private int port;
+ private volatile int port = -1;
- KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+ KubernetesCoordinator(ModuleDefineHolder manager,
+ final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+ this.manager = manager;
this.watch = watch;
this.uid = uidSupplier.get();
TelemetryRelatedContext.INSTANCE.setId(uid);
}
- @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
- this.port = remoteInstance.getAddress().getPort();
+ public void start() {
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
}
+ @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
+ this.port = remoteInstance.getAddress().getPort();
+ }
+
private void submitTask(final ListeningExecutorService service) {
watch.initOrReset();
ListenableFuture<?> watchFuture = service.submit(newWatch());
@@ -102,7 +111,19 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
}
@Override public List<RemoteInstance> queryRemoteNodes() {
- logger.debug("Query kubernetes remote nodes: {}", cache);
- return Lists.newArrayList(cache.values());
+ final List<RemoteInstance> list = new ArrayList<>();
+ cache.values().forEach(instance -> {
+ Address address = instance.getAddress();
+ if (port == -1) {
+ logger.debug("Query kubernetes remote, port hasn't init, try to init");
+ ConfigService service = manager.find(CoreModule.NAME).provider().getService(ConfigService.class);
+ port = service.getGRPCPort();
+ logger.debug("Query kubernetes remote, port is set at {}", port);
+ }
+ list.add(new RemoteInstance(new Address(address.getHost(), port, address.isSelf())));
+ });
+
+ logger.debug("Query kubernetes remote nodes: {}", list);
+ return list;
}
}
diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 4fd2699..982f4eb 100644
--- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -19,12 +19,18 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
public class KubernetesCoordinatorTest {
@@ -33,7 +39,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
- coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -43,7 +50,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
- coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -53,7 +61,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
- coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -63,10 +72,52 @@ public class KubernetesCoordinatorTest {
@Test
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
- coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
+
+ @Test
+ public void assertModifiedInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
+ }
+
+ @Test
+ public void assertDeletedInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(1));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+ }
+
+ @Test
+ public void assertErrorInReceiverRole() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
+ coordinator.start();
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+ assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
+ }
+
+ public ModuleDefineHolder getManager() {
+ ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting();
+ ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
+ moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine);
+ CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class);
+ when(config.getGRPCHost()).thenReturn("127.0.0.1");
+ when(config.getGRPCPort()).thenReturn(8454);
+ coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
+ return moduleManagerTesting;
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 8eb7302..feaa10f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -28,9 +28,8 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author peng-yongsheng
@@ -45,6 +44,7 @@ public class CoreModule extends ModuleDefine {
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
+ classes.add(ConfigService.class);
classes.add(DownsamplingConfigService.class);
classes.add(IComponentLibraryCatalogService.class);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 35e3b49..31b59cd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
*/
@Getter
public class CoreModuleConfig extends ModuleConfig {
- @Setter private String role;
+ @Setter private String role = "Mixed";
@Setter private String nameSpace;
@Setter private String restHost;
@Setter private int restPort;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 52063c0..0cfc956 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider {
jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
jettyServer.initialize();
+ this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
@@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
- if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) {
+ if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
new file mode 100644
index 0000000..36cf031
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.config;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+@Getter
+public class ConfigService implements Service {
+ private String gRPCHost;
+ private int gRPCPort;
+
+ public ConfigService(CoreModuleConfig moduleConfig) {
+ this.gRPCHost = moduleConfig.getGRPCHost();
+ this.gRPCPort = moduleConfig.getGRPCPort();
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 4e5d0ce..00c36cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.Const;
public class Address implements Comparable<Address> {
private final String host;
private final int port;
- @Setter private boolean isSelf;
+ @Setter private volatile boolean isSelf;
public Address(String host, int port, boolean isSelf) {
this.host = host;
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
index 9f9c82e..6ade680 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java
@@ -66,10 +66,13 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
private int serviceInstanceId = Const.NONE;
@Override public void onNext(StreamMetricsMessage message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received msg {}", message);
+ }
+
if (isFirst) {
isFirst = false;
StreamMetricsMessage.Identifier identifier = message.getIdentifier();
- logger.debug("Received identifier msg {}", identifier);
Node node = identifier.getNode();
if (node != null) {
String nodeId = node.getId();
@@ -110,6 +113,20 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
timestamp = metric.getTimestampMs();
value = metric.getGauge().getValue();
+ if (timestamp > 1000000000000000000L) {
+ /**
+ * Several versions of envoy in istio.deps send timestamp in nanoseconds,
+ * instead of milliseconds(protocol says).
+ *
+ * Sadly, but have to fix it forcedly.
+ *
+ * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds.
+ *
+ * This should be removed in the future.
+ */
+ timestamp /= 1_000_000;
+ }
+
EnvoyInstanceMetric metricSource = new EnvoyInstanceMetric();
metricSource.setServiceId(serviceId);
metricSource.setServiceName(serviceName);