You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/27 11:58:22 UTC
[incubator-skywalking] branch master updated: Add service and
instance reset function (#1790)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new b11d479 Add service and instance reset function (#1790)
b11d479 is described below
commit b11d47933714b04401bbdea3c9748abc8f23fb4e
Author: 刘新元 Liu XinYuan <87...@qq.com>
AuthorDate: Sat Oct 27 19:58:15 2018 +0800
Add service and instance reset function (#1790)
* Manually trigger agent registration and write registration status to file
* Delete unused variables
* Code optimization, encapsulating some methods
* add clear DataCarrier function
* revert SkyWalkingAgent
* Optimize code and logic, including updating applicatin_id in unRegisterOperationNames
* Server end metadata loss notification
* Server end metadata loss notification
* fix file stream close and other problem
* revert to old
* fix some bug
Signed-off-by: Liu-XinYuan <87...@qq.com>
* fix some bugs in agent reset
* add close inputStream and fileChannel and fix some bug
* fixed some bugs
---
.../apm/commons/datacarrier/DataCarrier.java | 6 +
.../apm/commons/datacarrier/buffer/Buffer.java | 6 +
.../skywalking/apm/agent/core/conf/Config.java | 18 ++-
.../agent/core/conf/SnifferConfigInitializer.java | 23 ++-
.../core/dictionary/NetworkAddressDictionary.java | 7 +-
.../core/dictionary/OperationNameDictionary.java | 11 ++
.../apm/agent/core/listener/ResetConfListener.java | 84 +++++++++++
.../apm/agent/core/listener/ResetStatus.java | 35 +++++
.../apm/agent/core/listener/Reseter.java | 159 +++++++++++++++++++++
.../core/remote/AppAndServiceRegisterClient.java | 49 ++++---
.../core/remote/TraceSegmentServiceClient.java | 9 +-
...ache.skywalking.apm.agent.core.boot.BootService | 1 +
.../apm/agent/core/boot/ServiceManagerTest.java | 2 +-
apm-sniffer/config/agent.config | 9 ++
.../service/ServiceInstanceInventoryRegister.java | 2 +-
.../src/main/resources/query-protocol | 2 +-
.../register/provider/RegisterModuleProvider.java | 7 -
.../handler/v5/ServiceInstancePingPkgHandler.java | 60 ++++++++
18 files changed, 445 insertions(+), 45 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 76ad609..bd9f9a6 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -152,4 +152,10 @@ public class DataCarrier<T> {
consumerPool.close();
}
}
+
+ public void clear() {
+ for (int i = 0; i < channels.getChannelSize(); i++) {
+ channels.getBuffer(i).clear();
+ }
+ }
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index 4fbb478..d9cbb34 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -91,4 +91,10 @@ public class Buffer<T> {
return result;
}
+ public void clear() {
+ for (Object obj : buffer) {
+ obj = null;
+ }
+ }
+
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index dfdea53..c5e305b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -16,7 +16,6 @@
*
*/
-
package org.apache.skywalking.apm.agent.core.conf;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
@@ -70,6 +69,21 @@ public class Config {
* Skywalking team may ask for these files in order to resolve compatible problem.
*/
public static boolean IS_OPEN_DEBUGGING_CLASS = false;
+
+ /**
+ * Specify register.status dir ,This is an option, the default is AGENT_HOME/option/reset.status.
+ */
+ public static String REGISTER_STATUS_DIR = "skywalking-agent/option";
+
+ /**
+ * Specify instance_uuid to ensure that the whole show is unique, for example: applicationName_ip_12
+ */
+ public static String INSTANCE_UUID = "";
+ /**
+ * enabled means that the reset function is enabled, and disabled means that the reset function is not enabled. A reset can be triggered by modifying the configuration file only if the reset feature is enabled.
+ */
+ public static String RESETER_LISTENER = "disabled";
+
}
public static class Collector {
@@ -80,7 +94,7 @@ public class Config {
/**
* application and service registry check interval
*/
- public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
+ public static long SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL = 3;
/**
* Collector skywalking trace receiver service addresses.
*/
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/SnifferConfigInitializer.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/SnifferConfigInitializer.java
index d33d304..3d8ef63 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/SnifferConfigInitializer.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/SnifferConfigInitializer.java
@@ -41,24 +41,23 @@ import org.apache.skywalking.apm.util.StringUtil;
*/
public class SnifferConfigInitializer {
private static final ILog logger = LogManager.getLogger(SnifferConfigInitializer.class);
- private static String SPECIFIED_CONFIG_PATH = "skywalking_config";
- private static String DEFAULT_CONFIG_FILE_NAME = "/config/agent.config";
- private static String ENV_KEY_PREFIX = "skywalking.";
+ private static final String SPECIFIED_CONFIG_PATH = "skywalking_config";
+ private static final String DEFAULT_CONFIG_FILE_NAME = "/config/agent.config";
+ private static final String ENV_KEY_PREFIX = "skywalking.";
+ private static final String INSTANCE_UUID_NAME = "agent.instance_uuid";
+ private static final String REGISTER_STATUS_DIR = "agent.register_status_dir";
private static boolean IS_INIT_COMPLETED = false;
/**
- * If the specified agent config path is set, the agent will try to locate the specified agent config.
- * If the specified agent config path is not set , the agent will try to locate `agent.config`, which should be in the /config dictionary of agent package.
- * <p>
- * Also try to override the config by system.env and system.properties. All the keys in these two places should
- * start with {@link #ENV_KEY_PREFIX}. e.g. in env `skywalking.agent.application_code=yourAppName` to override
- * `agent.application_code` in config file.
- * <p>
- * At the end, `agent.application_code` and `collector.servers` must be not blank.
+ * If the specified agent config path is set, the agent will try to locate the specified agent config. If the
+ * specified agent config path is not set , the agent will try to locate `agent.config`, which should be in the
+ * /config dictionary of agent package. <p> Also try to override the config by system.env and system.properties. All
+ * the keys in these two places should start with {@link #ENV_KEY_PREFIX}. e.g. in env
+ * `skywalking.agent.application_code=yourAppName` to override `agent.application_code` in config file. <p> At the
+ * end, `agent.application_code` and `collector.servers` must be not blank.
*/
public static void initialize() throws ConfigNotFoundException, AgentPackageNotFoundException {
InputStreamReader configFileStream;
-
try {
configFileStream = loadConfig();
Properties properties = new Properties();
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddressDictionary.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddressDictionary.java
index 208ba17..ca4d3b5 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddressDictionary.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddressDictionary.java
@@ -16,7 +16,6 @@
*
*/
-
package org.apache.skywalking.apm.agent.core.dictionary;
import io.netty.util.internal.ConcurrentSet;
@@ -49,6 +48,12 @@ public enum NetworkAddressDictionary {
}
}
+ public void clearApplicationDictionary() {
+ unRegisterApplications.clear();
+ applicationDictionary.clear();
+
+ }
+
public void syncRemoteDictionary(
NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/OperationNameDictionary.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/OperationNameDictionary.java
index 7f743b5..337bafb 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/OperationNameDictionary.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/dictionary/OperationNameDictionary.java
@@ -61,6 +61,13 @@ public enum OperationNameDictionary {
}
}
+ public void clearOperationNameDictionary() {
+ unRegisterOperationNames.clear();
+ operationNameDictionary.clear();
+
+ }
+
+
public void syncRemoteDictionary(
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub) {
if (unRegisterOperationNames.size() > 0) {
@@ -106,6 +113,10 @@ public enum OperationNameDictionary {
return applicationId;
}
+ public void setApplicationId(int applicationId) {
+ this.applicationId = applicationId;
+ }
+
public String getOperationName() {
return operationName;
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetConfListener.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetConfListener.java
new file mode 100644
index 0000000..7360651
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetConfListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.agent.core.listener;
+
+import java.io.File;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+
+/**
+ * @author liu-xinyuan
+ **/
+@DefaultImplementor
+public class ResetConfListener implements BootService, Runnable {
+ private static final ILog logger = LogManager.getLogger(ResetConfListener.class);
+ private File configFile = null;
+
+ @Override public void prepare() throws Throwable {
+
+ }
+
+ @Override public void boot() {
+ if ("enabled".equals(Config.Agent.RESETER_LISTENER)) {
+ Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ResetConfListener"))
+ .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
+ @Override
+ public void handle(Throwable t) {
+ logger.error("unexpected exception.", t);
+ }
+ }), 0, Config.Collector.SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
+
+ } else {
+ logger.info("Since the agent.register_status variable is not set correctly, the reset service is not started -> Agent reset service is inactive.");
+ }
+ }
+ @Override public void onComplete() throws Throwable {
+
+ }
+
+ @Override public void shutdown() throws Throwable {
+
+ }
+
+ @Override public void run() {
+ logger.debug("ResetConfListener running.");
+
+ try {
+ if (Reseter.INSTANCE.predicateReset())
+ Reseter.INSTANCE.setStatus(ResetStatus.DONE).clearID().reportToRegisterFile();
+ } catch (SecurityException e) {
+ logger.warn(e, "Denise read access to the file {}", configFile);
+ } catch (FileNotFoundException e) {
+ logger.warn(e, "not found file {}", configFile);
+ } catch (IOException e) {
+ logger.warn(e.getMessage());
+ }
+
+ }
+
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetStatus.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetStatus.java
new file mode 100644
index 0000000..ac9de48
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/ResetStatus.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.agent.core.listener;
+
+/**
+ * @author liu-xinyuan
+ **/
+public enum ResetStatus {
+ OFF("OFF"), ON("ON"), DONE("DONE");
+ private String label;
+
+ ResetStatus(String label) {
+ this.label = label;
+ }
+
+ public String value() {
+ return this.label;
+ }
+
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/Reseter.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/Reseter.java
new file mode 100644
index 0000000..cf1ecea
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/listener/Reseter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.agent.core.listener;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.Properties;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
+import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
+import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
+import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
+
+/**
+ * @author liu-xinyuan
+ **/
+public enum Reseter {
+ INSTANCE;
+ private static final ILog logger = LogManager.getLogger(Reseter.class);
+ private static final String APPLICATION_ID_NAM = "application_id";
+ private static final String INSTANCE_ID_NAME = "instance_id";
+ private static final String STATUS_NAME = "status";
+ private static final String RESET_CHILD_DIR = "/reset.status";
+ private static final String COMMENT = "#Status has three values: ON (trigger reset), DONE(reset complete), OFF(agent fist boot).\n" +
+ "Application_id: application_id of the current agent.\n" +
+ "Instance_id: the instanceid of the current agent.";
+ private volatile Properties properties = new Properties();
+ private String resetPath;
+ private ResetStatus status = ResetStatus.OFF;
+ private int isFirstRun = 0;
+ private int detectDuration = 5;
+
+ public Reseter setStatus(ResetStatus status) {
+ this.status = status;
+ return this;
+ }
+
+ public String getResetPath() throws IOException {
+ if (isFirstRun == 0) {
+ File statusDir = new File(Config.Agent.REGISTER_STATUS_DIR);
+
+ if (statusDir.exists() && statusDir.isDirectory()) {
+ resetPath = statusDir.getAbsolutePath() + RESET_CHILD_DIR;
+ } else {
+ statusDir.mkdir();
+ }
+ init();
+ isFirstRun = 1;
+ }
+ return resetPath;
+ }
+
+ public void reportToRegisterFile() throws IOException {
+ FileOutputStream outputStream = null;
+ try {
+ File configFile = new File(resetPath);
+ properties.setProperty(APPLICATION_ID_NAM, RemoteDownstreamConfig.Agent.APPLICATION_ID + "");
+ properties.setProperty(INSTANCE_ID_NAME, RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID + "");
+ properties.setProperty(STATUS_NAME, status.value());
+ outputStream = new FileOutputStream(configFile);
+ properties.store(outputStream, COMMENT);
+ } finally {
+ closeFileStream(outputStream);
+ }
+ }
+
+ public Reseter clearID() {
+ RemoteDownstreamConfig.Agent.APPLICATION_ID = DictionaryUtil.nullValue();
+ RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue();
+ OperationNameDictionary.INSTANCE.clearOperationNameDictionary();
+ NetworkAddressDictionary.INSTANCE.clearApplicationDictionary();
+ ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class).clearCache();
+ status = ResetStatus.DONE;
+ logger.info("clear id successfully,begin trigger reset.");
+ return this;
+ }
+
+ public Boolean predicateReset() throws IOException {
+ File resetFile = new File(getResetPath());
+ FileInputStream inputStream = null;
+ FileLock fileLock = null;
+ FileChannel fileChannel = null;
+ if (System.currentTimeMillis() - resetFile.lastModified() < detectDuration * 1000) {
+ try {
+ logger.info("The file reset.status was detected to have been modified in the last {} seconds.", detectDuration);
+ inputStream = new FileInputStream(resetFile);
+ fileChannel = inputStream.getChannel();
+ fileLock = fileChannel.tryLock(0, resetFile.length(), true);
+ if (fileLock == null) {
+ return false;
+ }
+ properties.clear();
+ properties.load(inputStream);
+ } finally {
+ fileLock.release();
+ fileChannel.close();
+ closeFileStream(inputStream);
+ }
+ if (properties.get(STATUS_NAME) != null && properties.getProperty(STATUS_NAME).equals(ResetStatus.ON.value())) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ public void init() throws IOException {
+ FileOutputStream outputStream = null;
+ try {
+ properties.setProperty(APPLICATION_ID_NAM, RemoteDownstreamConfig.Agent.APPLICATION_ID + "");
+ properties.setProperty(INSTANCE_ID_NAME, RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID + "");
+ properties.setProperty(STATUS_NAME, status.value());
+ File file = new File(resetPath);
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdir();
+ }
+ outputStream = new FileOutputStream(file);
+ properties.store(outputStream, COMMENT);
+ } finally {
+ closeFileStream(outputStream);
+ }
+ }
+
+ public void closeFileStream(Closeable stream) throws IOException {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new IOException("file close failed.", e);
+ }
+ } else {
+ throw new IOException("create file outputstream failed");
+ }
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
index 3ce38d9..15459c6 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
@@ -31,16 +31,21 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
+import org.apache.skywalking.apm.agent.core.listener.ResetStatus;
+import org.apache.skywalking.apm.agent.core.listener.Reseter;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.register.ServiceInstancePingGrpc;
+import org.apache.skywalking.apm.network.register.ServiceInstancePingPkg;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.util.StringUtil;
/**
* @author wusheng
@@ -48,13 +53,13 @@ import java.util.concurrent.TimeUnit;
@DefaultImplementor
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
- private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
+ private volatile ServiceInstancePingGrpc.ServiceInstancePingBlockingStub serviceInstancePingBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile long lastSegmentTime = -1;
@@ -66,6 +71,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel);
+ serviceInstancePingBlockingStub = ServiceInstancePingGrpc.newBlockingStub(channel);
} else {
applicationRegisterServiceBlockingStub = null;
instanceDiscoveryServiceBlockingStub = null;
@@ -82,13 +88,13 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
- .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
- .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
- @Override
- public void handle(Throwable t) {
- logger.error("unexpected exception.", t);
- }
- }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
+ .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
+ .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
+ @Override
+ public void handle(Throwable t) {
+ logger.error("unexpected exception.", t);
+ }
+ }), 0, Config.Collector.SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
@@ -105,15 +111,17 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
public void run() {
logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
boolean shouldTry = true;
+ String instanceUUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString().replaceAll("-", "") : Config.Agent.INSTANCE_UUID;
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
- Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
+ Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping != null) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
+ Reseter.INSTANCE.reportToRegisterFile();
shouldTry = true;
}
}
@@ -122,21 +130,24 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
- .setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
- .setAgentUUID(PROCESS_UUID)
- .setRegisterTime(System.currentTimeMillis())
- .setOsinfo(OSUtil.buildOSInfo())
- .build());
+ .setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
+ .setAgentUUID(instanceUUID)
+ .setRegisterTime(System.currentTimeMillis())
+ .setOsinfo(OSUtil.buildOSInfo())
+ .build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
- = instanceMapping.getApplicationInstanceId();
+ = instanceMapping.getApplicationInstanceId();
+ Reseter.INSTANCE.setStatus(ResetStatus.OFF).reportToRegisterFile();
+
}
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
- instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
- .setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
- .setHeartbeatTime(System.currentTimeMillis())
- .build());
+ serviceInstancePingBlockingStub.doPing(ServiceInstancePingPkg.newBuilder()
+ .setServiceInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
+ .setServiceInstanceUUID(instanceUUID)
+ .setTime(System.currentTimeMillis())
+ .build());
}
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 697d747..2c277fe 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -25,9 +25,11 @@ import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -155,7 +157,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void afterFinished(TraceSegment traceSegment) {
- if (traceSegment.isIgnore()) {
+ if (traceSegment.isIgnore() || RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue() || RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
return;
}
if (!carrier.produce(traceSegment)) {
@@ -173,4 +175,9 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
this.status = status;
}
+
+ public void clearCache() {
+ carrier.clear();
+ }
+
}
diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
index 9beec08..14bef1c 100644
--- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++ b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -23,3 +23,4 @@ org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
+org.apache.skywalking.apm.agent.core.listener.ResetConfListener
\ No newline at end of file
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
index 3009f4b..46544f3 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
@@ -55,7 +55,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
- assertThat(registryService.size(), is(7));
+ assertThat(registryService.size(), is(8));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index fbf0c6b..6c16f8f 100644
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -43,3 +43,12 @@ collector.backend_service=127.0.0.1:10800
# Logging level
logging.level=DEBUG
+
+#Specify register.status dir,if dir not exists or it is a file then default AGENT_HOME/option
+#agent.register_status_dir=register_dir
+
+#Specify instance_uuid to ensure that the whole show is unique, for example: applicationName_ip_12
+#agent.instance_uuid = applicationName_ip_1
+
+#enabled means that the reset function is enabled, and disabled means that the reset function is not enabled. A reset can be triggered by modifying the configuration file only if the reset feature is enabled.
+#agent.reseter_listener = disabled
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
index af02992..1fd7851 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInstanceInventoryRegister.java
@@ -109,4 +109,4 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
logger.warn("Service instance {} heartbeat, but not found in storage.");
}
}
-}
+}
\ No newline at end of file
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 1122e97..3a83be7 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 1122e97b5604ae96447bd58ecdb248d7e02952aa
+Subproject commit 3a83be79a9c23aad6576ed2a4a04b82de6d7a829
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java
index 6a10141..d6a6f2f 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java
@@ -22,7 +22,6 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.register.module.RegisterModule;
-import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.*;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.rest.*;
/**
@@ -46,12 +45,6 @@ public class RegisterModuleProvider extends ModuleProvider {
}
@Override public void start() {
- GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
- grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
- grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
- grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
- grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
-
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/ServiceInstancePingPkgHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/ServiceInstancePingPkgHandler.java
new file mode 100644
index 0000000..15d50c2
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/ServiceInstancePingPkgHandler.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.register.ServiceInstancePingGrpc;
+import org.apache.skywalking.apm.network.register.ServiceInstancePingPkg;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceInstancePingPkgHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {
+ private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingPkgHandler.class);
+
+ private final IServiceInstanceInventoryCacheDAO instanceInventoryCacheDAO;
+ private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
+
+ public ServiceInstancePingPkgHandler(ModuleManager moduleManager) {
+ this.instanceInventoryCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
+ this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+
+ }
+
+ @Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
+
+ ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCacheDAO.get(request.getServiceInstanceId());
+ if (request.getServiceInstanceUUID().equals(serviceInstanceInventory.getName()) || serviceInstanceInventory.getServiceId() == Const.NONE) {
+ logger.error("Your metadata loss,please set the status in reset.status in the agent {} to ON to trigger a reset!", request.getServiceInstanceUUID());
+ } else {
+ serviceInstanceInventoryRegister.heartbeat(request.getServiceInstanceId(), request.getTime());
+ }
+
+ }
+}