You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/01/09 01:35:54 UTC
[skywalking] branch master updated: Provide profile task downstream
to sniffer (#4172)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4cf0600 Provide profile task downstream to sniffer (#4172)
4cf0600 is described below
commit 4cf0600f351f9c99af804bc90bcb54d3b778c7b8
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Jan 9 09:35:40 2020 +0800
Provide profile task downstream to sniffer (#4172)
* Provide profile task downstream to sniffer
* fix agent unit testcase issue
* add profile switch config on sniffer
* fix es error
* 1. add @DefaultImplementor on the sniffer profile task service
2. change ProfileTaskExecutionService#PROFILE_TASK_READY_SCHEDULE to final and remove volatile
2. fix style error
* change timeFromStartMills use `<` to compare
* 1. add `maxSamplingCount` to profile task
2. make profile task limit to the common package
* 1. change `agent.active_profile` to `profile.active` and make true on default
2. add `maxSamplingCount` in profile task
3. use `createTime` to check has new command list
4. add task re-check before process profile task
* 1. add `profile-receiver` document
2. change `ProfileTaskExecutionService` use single schedule thread pool
3. cache dont need fetch data when no data, use auto-fresh mechanism only
* remove navigate time judge, fix comment wrong meaning
---
.../apm/network/constants/ProfileConstants.java | 39 ++--
.../component/command/CommandDeserializer.java | 2 +
.../component/command/ProfileTaskCommand.java | 128 +++++++++++++
apm-protocol/apm-network/src/main/proto | 2 +-
.../core/commands/CommandExecutorService.java | 5 +
.../executor/ProfileTaskCommandExecutor.java | 54 ++++++
.../skywalking/apm/agent/core/conf/Config.java | 11 ++
.../apm/agent/core/profile/ProfileTask.java | 125 +++++++++++++
.../core/profile/ProfileTaskExecutionContext.java | 61 +++++++
.../core/profile/ProfileTaskExecutionService.java | 201 +++++++++++++++++++++
.../core/profile/ProfileTaskQueryService.java | 117 ++++++++++++
...ache.skywalking.apm.agent.core.boot.BootService | 2 +
.../apm/agent/core/boot/ServiceManagerTest.java | 16 +-
apm-sniffer/config/agent.config | 3 +
dist-material/application.yml | 2 +
docker/oap/docker-entrypoint.sh | 2 +
docs/en/setup/backend/backend-receivers.md | 3 +
docs/en/setup/service-agent/java-agent/README.md | 2 +
oap-server/server-bootstrap/pom.xml | 5 +
.../src/test/resources/application.yml | 2 +
.../skywalking/oap/server/core/CoreModule.java | 7 +-
.../oap/server/core/CoreModuleConfig.java | 5 +
.../oap/server/core/CoreModuleProvider.java | 1 +
.../oap/server/core/cache/CacheUpdateTimer.java | 27 +++
.../oap/server/core/cache/ProfileTaskCache.java | 99 ++++++++++
.../oap/server/core/command/CommandService.java | 7 +
.../server/core/profile/ProfileTaskLogRecord.java | 86 +++++++++
.../core/profile/ProfileTaskMutationService.java | 31 ++--
.../server/core/profile/ProfileTaskNoneStream.java | 4 +
.../server/core/query/ProfileTaskQueryService.java | 42 ++++-
.../oap/server/core/query/entity/ProfileTask.java | 6 +
.../{ProfileTask.java => ProfileTaskLog.java} | 21 ++-
...eTask.java => ProfileTaskLogOperationType.java} | 53 ++++--
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../oap/server/core/storage/StorageModule.java | 3 +-
.../storage/profile/IProfileTaskLogQueryDAO.java | 29 ++-
.../src/test/resources/application.yml | 2 +
.../query/graphql/resolver/ProfileMutation.java | 3 +-
.../graphql/type/ProfileTaskCreationRequest.java | 1 +
.../src/main/resources/query-protocol | 2 +-
oap-server/server-receiver-plugin/pom.xml | 1 +
.../skywalking-profile-receiver-plugin/pom.xml | 40 ++++
.../receiver/profile/module/ProfileModule.java | 28 ++-
.../profile/provider/ProfileModuleProvider.java | 67 +++++++
.../handler/ProfileTaskServiceHandler.java | 96 ++++++++++
...ywalking.oap.server.library.module.ModuleDefine | 11 +-
...alking.oap.server.library.module.ModuleProvider | 11 +-
.../StorageModuleElasticsearchProvider.java | 2 +
.../elasticsearch/query/ProfileTaskLogEsDAO.java | 82 +++++++++
.../elasticsearch/query/ProfileTaskQueryEsDAO.java | 4 +-
.../StorageModuleElasticsearch7Provider.java | 3 +
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 2 +
.../jdbc/h2/dao/H2ProfileTaskLogQueryDAO.java | 79 ++++++++
.../plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java | 4 +-
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 2 +
test/e2e/e2e-mysql/src/docker/application.yml | 2 +
.../e2e-profile/e2e-profile-es-test-runner/pom.xml | 1 +
.../skywalking/e2e/ProfileVerificationITCase.java | 42 +++--
.../e2e-profile/e2e-profile-h2-test-runner/pom.xml | 1 +
.../skywalking/e2e/ProfileVerificationITCase.java | 40 ++--
.../e2e-profile-mysql-test-runner/pom.xml | 1 +
.../src/docker/application.yml | 2 +
.../skywalking/e2e/ProfileVerificationITCase.java | 42 +++--
.../skywalking/e2e/profile/ProfileClient.java | 3 +-
.../creation/ProfileTaskCreationRequest.java | 1 +
.../skywalking/e2e/profile/query/ProfileTask.java | 5 +
.../{ProfileTask.java => ProfileTaskLog.java} | 14 +-
...ProfileTask.java => ProfileTaskLogMatcher.java} | 24 +--
.../e2e/profile/query/ProfileTaskMatcher.java | 13 ++
....e2e.ProfileVerificationITCase.profileTasks.yml | 6 +
.../src/main/resources/getProfileTaskList.gql | 7 +
.../src/main/resources/profileTaskCreation.gql | 3 +-
72 files changed, 1670 insertions(+), 181 deletions(-)
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/constants/ProfileConstants.java
similarity index 56%
copy from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
copy to apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/constants/ProfileConstants.java
index 78323c9..6e6f7ac 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/constants/ProfileConstants.java
@@ -15,28 +15,33 @@
* limitations under the License.
*
*/
-
-package org.apache.skywalking.e2e.profile.creation;
-
-import lombok.*;
+package org.apache.skywalking.apm.network.constants;
/**
- * e2e profile, create profile task entity
+ * profile task limit constants
*
* @author MrPro
*/
-@Setter
-@Getter
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-public class ProfileTaskCreationRequest {
+public class ProfileConstants {
+
+ /**
+ * Monitor duration must greater than 1 minutes
+ */
+ public static final int TASK_DURATION_MIN_MINUTE = 1;
+
+ /**
+ * The duration of the monitoring task cannot be greater than 15 minutes
+ */
+ public static final int TASK_DURATION_MAX_MINUTE = 15;
+
+ /**
+ * Dump period must be greater than or equals 10 milliseconds
+ */
+ public static final int TASK_DUMP_PERIOD_MIN_MILLIS = 10;
- private int serviceId;
- private String endpointName;
- private long startTime;
- private int duration;
- private int minDurationThreshold;
- private int dumpPeriod;
+ /**
+ * Max sampling count must less than 10
+ */
+ public static final int TASK_MAX_SAMPLING_COUNT = 10;
}
diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java
index db6478f..7824f49 100644
--- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java
@@ -28,6 +28,8 @@ public class CommandDeserializer {
final String commandName = command.getCommand();
if (ServiceResetCommand.NAME.equals(commandName)) {
return ServiceResetCommand.DESERIALIZER.deserialize(command);
+ } else if (ProfileTaskCommand.NAME.equals(commandName)) {
+ return ProfileTaskCommand.DESERIALIZER.deserialize(command);
}
throw new UnsupportedCommandException(command);
}
diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
new file mode 100644
index 0000000..fecb876
--- /dev/null
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.network.trace.component.command;
+
+import org.apache.skywalking.apm.network.common.Command;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
+
+import java.util.List;
+
+/**
+ * @author MrPro
+ */
+public class ProfileTaskCommand extends BaseCommand implements Serializable, Deserializable<ProfileTaskCommand> {
+ public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0);
+ public static final String NAME = "ProfileTaskQuery";
+
+ // profile task data
+ private String endpointName;
+ private int duration;
+ private int minDurationThreshold;
+ private int dumpPeriod;
+ private int maxSamplingCount;
+ private long startTime;
+ private long createTime;
+
+ public ProfileTaskCommand(String serialNumber, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) {
+ super(NAME, serialNumber);
+ this.endpointName = endpointName;
+ this.duration = duration;
+ this.minDurationThreshold = minDurationThreshold;
+ this.dumpPeriod = dumpPeriod;
+ this.maxSamplingCount = maxSamplingCount;
+ this.startTime = startTime;
+ this.createTime = createTime;
+ }
+
+ @Override
+ public ProfileTaskCommand deserialize(Command command) {
+ final List<KeyStringValuePair> argsList = command.getArgsList();
+ String serialNumber = null;
+ String endpointName = null;
+ int duration = 0;
+ int minDurationThreshold = 0;
+ int dumpPeriod = 0;
+ int maxSamplingCount = 0;
+ long startTime = 0;
+ long createTime = 0;
+
+ for (final KeyStringValuePair pair : argsList) {
+ if ("SerialNumber".equals(pair.getKey())) {
+ serialNumber = pair.getValue();
+ } else if ("EndpointName".equals(pair.getKey())) {
+ endpointName = pair.getValue();
+ } else if ("Duration".equals(pair.getKey())) {
+ duration = Integer.parseInt(pair.getValue());
+ } else if ("MinDurationThreshold".equals(pair.getKey())) {
+ minDurationThreshold = Integer.parseInt(pair.getValue());
+ } else if ("DumpPeriod".equals(pair.getKey())) {
+ dumpPeriod = Integer.parseInt(pair.getValue());
+ } else if ("MaxSamplingCount".equals(pair.getKey())) {
+ maxSamplingCount = Integer.parseInt(pair.getValue());
+ } else if ("StartTime".equals(pair.getKey())) {
+ startTime = Long.parseLong(pair.getValue());
+ } else if ("CreateTime".equals(pair.getKey())) {
+ createTime = Long.parseLong(pair.getValue());
+ }
+ }
+
+ return new ProfileTaskCommand(serialNumber, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
+ }
+
+ @Override
+ public Command.Builder serialize() {
+ final Command.Builder builder = commandBuilder();
+ builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold)))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod)))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("MaxSamplingCount").setValue(String.valueOf(maxSamplingCount)))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("StartTime").setValue(String.valueOf(startTime)))
+ .addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
+ return builder;
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public int getDuration() {
+ return duration;
+ }
+
+ public int getMinDurationThreshold() {
+ return minDurationThreshold;
+ }
+
+ public int getDumpPeriod() {
+ return dumpPeriod;
+ }
+
+ public int getMaxSamplingCount() {
+ return maxSamplingCount;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+}
diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index c711378..8f897e8 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit c7113782a74858bae14ade4abc13653f26bf304a
+Subproject commit 8f897e825d3c27dd661b255fb59096932705894c
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java
index fd748ba..09db961 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java
@@ -20,8 +20,10 @@ package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor;
+import org.apache.skywalking.apm.agent.core.commands.executor.ProfileTaskCommandExecutor;
import org.apache.skywalking.apm.agent.core.commands.executor.ServiceResetCommandExecutor;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
+import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import java.util.HashMap;
@@ -49,6 +51,9 @@ public class CommandExecutorService implements BootService, CommandExecutor {
// Register all the supported commands with their executors here
commandExecutorMap.put(ServiceResetCommand.NAME, new ServiceResetCommandExecutor());
+
+ // Profile task executor
+ commandExecutorMap.put(ProfileTaskCommand.NAME, new ProfileTaskCommandExecutor());
}
@Override
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
new file mode 100644
index 0000000..c6c22d2
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.commands.executor;
+
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException;
+import org.apache.skywalking.apm.agent.core.commands.CommandExecutor;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTask;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
+import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
+import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
+
+/**
+ * Command executor that executes the {@link ProfileTaskCommand} command
+ *
+ * @author MrPro
+ */
+public class ProfileTaskCommandExecutor implements CommandExecutor {
+
+ @Override
+ public void execute(BaseCommand command) throws CommandExecutionException {
+ final ProfileTaskCommand profileTaskCommand = (ProfileTaskCommand) command;
+
+ // build profile task
+ final ProfileTask profileTask = new ProfileTask();
+ profileTask.setEndpointName(profileTaskCommand.getEndpointName());
+ profileTask.setDuration(profileTaskCommand.getDuration());
+ profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
+ profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
+ profileTask.setMaxSamplingCount(profileTaskCommand.getMaxSamplingCount());
+ profileTask.setStartTime(profileTaskCommand.getStartTime());
+ profileTask.setCreateTime(profileTaskCommand.getCreateTime());
+
+ // send to executor
+ ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).addProfileTask(profileTask);
+ }
+
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 12e0201..ff12bdb 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -137,6 +137,17 @@ public class Config {
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
+ /**
+ * Get profile task list interval
+ */
+ public static int GET_PROFILE_TASK_INTERVAL = 20;
+ }
+
+ public static class Profile {
+ /**
+ * If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
+ */
+ public static boolean ACTIVE = true;
}
public static class Jvm {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
new file mode 100644
index 0000000..8669c7d
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import java.util.Objects;
+
+/**
+ * Profile task bean, receive from OAP server
+ *
+ * @author MrPro
+ */
+public class ProfileTask {
+
+ // monitor endpoint name
+ private String endpointName;
+
+ // task duration (minute)
+ private int duration;
+
+ // trace start monitoring time (ms)
+ private int minDurationThreshold;
+
+ // thread dump period (ms)
+ private int threadDumpPeriod;
+
+ // max number of traces monitor on the sniffer
+ private int maxSamplingCount;
+
+ // task start time
+ private long startTime;
+
+ // task create time
+ private long createTime;
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public void setEndpointName(String endpointName) {
+ this.endpointName = endpointName;
+ }
+
+ public int getDuration() {
+ return duration;
+ }
+
+ public void setDuration(int duration) {
+ this.duration = duration;
+ }
+
+ public int getMinDurationThreshold() {
+ return minDurationThreshold;
+ }
+
+ public void setMinDurationThreshold(int minDurationThreshold) {
+ this.minDurationThreshold = minDurationThreshold;
+ }
+
+ public int getThreadDumpPeriod() {
+ return threadDumpPeriod;
+ }
+
+ public void setThreadDumpPeriod(int threadDumpPeriod) {
+ this.threadDumpPeriod = threadDumpPeriod;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public int getMaxSamplingCount() {
+ return maxSamplingCount;
+ }
+
+ public void setMaxSamplingCount(int maxSamplingCount) {
+ this.maxSamplingCount = maxSamplingCount;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(long createTime) {
+ this.createTime = createTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ProfileTask that = (ProfileTask) o;
+ return duration == that.duration &&
+ minDurationThreshold == that.minDurationThreshold &&
+ threadDumpPeriod == that.threadDumpPeriod &&
+ maxSamplingCount == that.maxSamplingCount &&
+ startTime == that.startTime &&
+ createTime == that.createTime &&
+ endpointName.equals(that.endpointName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(endpointName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime);
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
new file mode 100644
index 0000000..20b9eec
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import java.util.Objects;
+
+/**
+ * profile task execution context, it will create on process this profile task
+ *
+ * @author MrPro
+ */
+public class ProfileTaskExecutionContext {
+
+ // task data
+ private final ProfileTask task;
+
+ // task real start time
+ private final long startTime;
+
+ public ProfileTaskExecutionContext(ProfileTask task, long startTime) {
+ this.task = task;
+ this.startTime = startTime;
+ }
+
+ public ProfileTask getTask() {
+ return task;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ProfileTaskExecutionContext that = (ProfileTaskExecutionContext) o;
+ return Objects.equals(task, that.task);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(task);
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
new file mode 100644
index 0000000..0bf0f50
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.network.constants.ProfileConstants;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Profile task executor, use {@link #addProfileTask(ProfileTask)} to add a new profile task.
+ *
+ * @author MrPro
+ */
+@DefaultImplementor
+public class ProfileTaskExecutionService implements BootService {
+
+ private static final ILog logger = LogManager.getLogger(ProfileTaskExecutionService.class);
+
+ // add a schedule while waiting for the task to start or finish
+ private final static ScheduledExecutorService PROFILE_TASK_SCHEDULE = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("PROFILE-TASK-SCHEDULE"));
+
+ // last command create time, use to next query task list
+ private volatile long lastCommandCreateTime = -1;
+
+ // current processing profile task context
+ private final AtomicReference<ProfileTaskExecutionContext> taskExecutionContext = new AtomicReference<>();
+
+ // profile task list, include running and waiting running tasks
+ private final List<ProfileTask> profileTaskList = Collections.synchronizedList(new LinkedList<>());
+
+ /**
+ * get profile task from OAP
+ * @param task
+ */
+ public void addProfileTask(ProfileTask task) {
+ // update last command create time
+ if (task.getCreateTime() > lastCommandCreateTime) {
+ lastCommandCreateTime = task.getCreateTime();
+ }
+
+ // check profile task limit
+ final String dataError = checkProfileTaskSuccess(task);
+ if (dataError != null) {
+ logger.warn("check command error, cannot process this profile task. reason: {}", dataError);
+ return;
+ }
+
+ // add task to list
+ profileTaskList.add(task);
+
+ // schedule to start task
+ long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
+ PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
+ @Override
+ public void run() {
+ processProfileTask(task);
+ }
+ }, timeToProcessMills, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * active the selected profile task to execution task, and start a removal task for it.
+ * @param task
+ */
+ private synchronized void processProfileTask(ProfileTask task) {
+ // make sure prev profile task already stopped
+ stopCurrentProfileTask(taskExecutionContext.get());
+
+ // make stop task schedule and task context
+ // TODO process task on next step
+ final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task, System.currentTimeMillis());
+ taskExecutionContext.set(currentStartedTaskContext);
+
+ PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
+ @Override
+ public void run() {
+ stopCurrentProfileTask(currentStartedTaskContext);
+ }
+ }, task.getDuration(), TimeUnit.MINUTES);
+ }
+
+ /**
+ * stop profile task, remove context data
+ */
+ private synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
+ // stop same context only
+ if (needToStop == null || !taskExecutionContext.compareAndSet(needToStop, null)) {
+ return;
+ }
+
+ // remove task
+ profileTaskList.remove(needToStop.getTask());
+
+ // TODO notify OAP current profile task execute finish
+ }
+
+ @Override
+ public void prepare() throws Throwable {
+
+ }
+
+ @Override
+ public void boot() throws Throwable {
+
+ }
+
+ @Override
+ public void onComplete() throws Throwable {
+
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+ PROFILE_TASK_SCHEDULE.shutdown();
+ }
+
+ public long getLastCommandCreateTime() {
+ return lastCommandCreateTime;
+ }
+
+ /**
+ * check profile task data success, make the re-check, prevent receiving wrong data from database or OAP
+ * @param task
+ * @return
+ */
+ private String checkProfileTaskSuccess(ProfileTask task) {
+ // endpoint name
+ if (StringUtil.isEmpty(task.getEndpointName())) {
+ return "endpoint name cannot be empty";
+ }
+
+ // duration
+ if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
+ return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
+ }
+ if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
+ return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
+ }
+
+ // min duration threshold
+ if (task.getMinDurationThreshold() < 0) {
+ return "min duration threshold must greater than or equals zero";
+ }
+
+ // dump period
+ if (task.getThreadDumpPeriod() < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
+ return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
+ }
+
+ // max sampling count
+ if (task.getMaxSamplingCount() <= 0) {
+ return "max sampling count must greater than zero";
+ }
+ if (task.getMaxSamplingCount() >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
+ return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT;
+ }
+
+ // check task queue, check only one task in a certain time
+ long taskProcessFinishTime = calcProfileTaskFinishTime(task);
+ for (ProfileTask profileTask : profileTaskList) {
+
+ // if the end time of the task to be added is during the execution of any data, means is a error data
+ if (taskProcessFinishTime >= profileTask.getStartTime() && taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) {
+ return "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getEndpointName();
+ }
+ }
+
+ return null;
+ }
+
+ private long calcProfileTaskFinishTime(ProfileTask task) {
+ return task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration());
+ }
+
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
new file mode 100644
index 0000000..ef7fdb4
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import io.grpc.Channel;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
+import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
+/**
+ * sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
+ *
+ * @author MrPro
+ */
+@DefaultImplementor
+public class ProfileTaskQueryService implements BootService, Runnable, GRPCChannelListener {
+ private static final ILog logger = LogManager.getLogger(ProfileTaskQueryService.class);
+
+ private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
+ private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub;
+ private volatile ScheduledFuture<?> getTaskListFuture;
+
+ @Override
+ public void run() {
+ if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
+ && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
+ ) {
+ if (status == GRPCChannelStatus.CONNECTED) {
+ try {
+ ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
+
+ // sniffer info
+ builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
+
+ // last command create time
+ builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
+
+ Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
+ ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
+ } catch (Throwable t) {
+ logger.error(t, "query profile task from Collector fail.", t);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void prepare() throws Throwable {
+ ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
+ }
+
+ @Override
+ public void boot() throws Throwable {
+ if (Config.Profile.ACTIVE) {
+ getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileGetTaskService"))
+ .scheduleWithFixedDelay(this, 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public void onComplete() throws Throwable {
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+ if (getTaskListFuture != null) {
+ getTaskListFuture.cancel(true);
+ }
+ }
+
+ @Override
+ public void statusChanged(GRPCChannelStatus status) {
+ if (GRPCChannelStatus.CONNECTED.equals(status)) {
+ Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
+ profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
+ } else {
+ profileTaskBlockingStub = null;
+ }
+ this.status = status;
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
index 7dbc784..826b032 100644
--- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++ b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -26,3 +26,5 @@ org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
+org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService
+org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
index ff5b3a8..0390dcc 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
@@ -27,6 +27,8 @@ import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.jvm.JVMService;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
@@ -55,13 +57,15 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
- assertThat(registryService.size(), is(10));
+ assertThat(registryService.size(), is(12));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
+ assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class));
+ assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class));
assertTracingContextListener();
assertIgnoreTracingContextListener();
@@ -83,11 +87,19 @@ public class ServiceManagerTest {
assertNotNull(service);
}
+ private void assertProfileTaskQueryService(ProfileTaskQueryService service) {
+ assertNotNull(service);
+ }
+
+ private void assertProfileTaskExecuteService(ProfileTaskExecutionService service) {
+ assertNotNull(service);
+ }
+
private void assertGRPCChannelManager(GRPCChannelManager service) throws Exception {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
- assertEquals(listeners.size(), 3);
+ assertEquals(listeners.size(), 4);
}
private void assertSamplingService(SamplingService service) {
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index 3687de5..acd369b 100644
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -41,6 +41,9 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
# The operationName max length
# agent.operation_name_threshold=${SW_AGENT_OPERATION_NAME_THRESHOLD:500}
+# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
+# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
+
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
diff --git a/dist-material/application.yml b/dist-material/application.yml
index 15763bc..25e165a 100644
--- a/dist-material/application.yml
+++ b/dist-material/application.yml
@@ -151,6 +151,8 @@ receiver-jvm:
default:
receiver-clr:
default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/docker/oap/docker-entrypoint.sh b/docker/oap/docker-entrypoint.sh
index 8ffee25..8fb2abd 100755
--- a/docker/oap/docker-entrypoint.sh
+++ b/docker/oap/docker-entrypoint.sh
@@ -355,6 +355,8 @@ receiver-clr:
default:
receiver-so11y:
default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: \${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md
index 2f8dbbd..2e8d5c2 100644
--- a/docs/en/setup/backend/backend-receivers.md
+++ b/docs/en/setup/backend/backend-receivers.md
@@ -13,6 +13,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
1. **receiver_jaeger**. See [details](#jaeger-receiver).
+1. **receiver-profile**. gRPC services accept profile task status and snapshot reporter.
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
@@ -42,6 +43,8 @@ receiver_zipkin:
host: 0.0.0.0
port: 9411
contextPath: /
+receiver-profile:
+ default:
```
## gRPC/HTTP server for receiver
diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md
index 0c0b305..ae3a34a 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -90,6 +90,7 @@ property key | Description | Default |
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
+`collector.get_profile_task_interval`|Sniffer get profile task list interval.|`20`|
`logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
@@ -102,6 +103,7 @@ property key | Description | Default |
`buffer.buffer_size`|The buffer size.|`300`|
`dictionary.service_code_buffer_size`|The buffer size of application codes and peer|`10 * 10000`|
`dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and peer|`1000 * 10000`|
+`profile.active`|If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.|`true`|
`plugin.peer_max_length `|Peer maximum description limit.|`200`|
`plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB access, default is false. Only trace the operation, not include parameters.|`false`|
`plugin.mongodb.filter_length_limit`|If set to positive number, the `WriteRequest.params` would be truncated to this length, otherwise it would be completely saved, which may cause performance problem.|`256`|
diff --git a/oap-server/server-bootstrap/pom.xml b/oap-server/server-bootstrap/pom.xml
index fe670f3..573a9f2 100644
--- a/oap-server/server-bootstrap/pom.xml
+++ b/oap-server/server-bootstrap/pom.xml
@@ -116,6 +116,11 @@
<artifactId>skywalking-so11y-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-profile-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- receiver module -->
<!-- storage module -->
diff --git a/oap-server/server-bootstrap/src/test/resources/application.yml b/oap-server/server-bootstrap/src/test/resources/application.yml
index 20ee234..4fabdd8 100755
--- a/oap-server/server-bootstrap/src/test/resources/application.yml
+++ b/oap-server/server-bootstrap/src/test/resources/application.yml
@@ -100,6 +100,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 49fc220..cf09e77 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -20,10 +20,8 @@ package org.apache.skywalking.oap.server.core;
import java.util.ArrayList;
import java.util.List;
-import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+
+import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
@@ -82,6 +80,7 @@ public class CoreModule extends ModuleDefine {
private void addProfileService(List<Class> classes) {
classes.add(ProfileTaskMutationService.class);
classes.add(ProfileTaskQueryService.class);
+ classes.add(ProfileTaskCache.class);
}
private void addQueryService(List<Class> classes) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index df6db62..74a2113 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -67,6 +67,11 @@ public class CoreModuleConfig extends ModuleConfig {
private long maxSizeOfEndpointInventory = 1_000_000L;
private long maxSizeOfNetworkInventory = 1_000_000L;
+ /**
+ * Following are cache setting for none stream(s)
+ */
+ private long maxSizeOfProfileTask = 10_000L;
+
CoreModuleConfig() {
this.downsampling = new ArrayList<>();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 4286cb7..0713497 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -170,6 +170,7 @@ public class CoreModuleProvider extends ModuleProvider {
// add profile service implementations
this.registerServiceImplementation(ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
this.registerServiceImplementation(ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager()));
+ this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
index f209b0a..e55984e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
@@ -18,13 +18,18 @@
package org.apache.skywalking.oap.server.core.cache;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -56,6 +61,7 @@ public enum CacheUpdateTimer {
updateServiceInventory(moduleDefineHolder);
updateServiceInstanceInventory(moduleDefineHolder);
updateNetAddressInventory(moduleDefineHolder);
+ updateProfileTask(moduleDefineHolder);
}
private void updateServiceInventory(ModuleDefineHolder moduleDefineHolder) {
@@ -115,4 +121,25 @@ public enum CacheUpdateTimer {
}
});
}
+
+ /**
+ * update all profile task list for each service
+ * @param moduleDefineHolder
+ */
+ private void updateProfileTask(ModuleDefineHolder moduleDefineHolder) {
+ IProfileTaskQueryDAO profileTaskQueryDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
+ ProfileTaskCache profileTaskCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ProfileTaskCache.class);
+ try {
+ final List<ProfileTask> taskList = profileTaskQueryDAO.getTaskList(null, null, profileTaskCache.getCacheStartTimeBucket(), profileTaskCache.getCacheEndTimeBucket(), null);
+
+ taskList.stream().collect(Collectors.groupingBy(t -> t.getServiceId())).entrySet().stream().forEach(e -> {
+ final Integer serviceId = e.getKey();
+ final List<ProfileTask> profileTasks = e.getValue();
+
+ profileTaskCache.saveTaskList(serviceId, profileTasks);
+ });
+ } catch (IOException e) {
+ logger.warn("Unable to update profile task cache", e);
+ }
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
new file mode 100644
index 0000000..b790fbc
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.cache;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * cache need to execute profile task
+ *
+ * @author MrPro
+ */
+public class ProfileTaskCache implements Service {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskCache.class);
+
+ private final Cache<Integer, List<ProfileTask>> profileTaskCache;
+
+ private final ModuleManager moduleManager;
+
+ public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
+ this.moduleManager = moduleManager;
+
+ long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
+ int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : initialSize);
+
+ profileTaskCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
+ // remove old profile task data
+ .expireAfterWrite(Duration.ofMinutes(1)).build();
+ }
+
+ /**
+ * query executable profile task
+ * @param serviceId
+ * @return
+ */
+ public List<ProfileTask> getProfileTaskList(int serviceId) {
+ // read profile task list from cache only, use cache update timer mechanism
+ List<ProfileTask> profileTaskList = profileTaskCache.getIfPresent(serviceId);
+ return profileTaskList;
+ }
+
+ /**
+ * save service task list
+ * @param serviceId
+ * @param taskList
+ */
+ public void saveTaskList(int serviceId, List<ProfileTask> taskList) {
+ if (taskList == null) {
+ taskList = Collections.emptyList();
+ }
+
+ profileTaskCache.put(serviceId, taskList);
+ }
+
+ /**
+ * use for every db query
+ * @return
+ */
+ public long getCacheStartTimeBucket() {
+ return TimeBucket.getRecordTimeBucket(System.currentTimeMillis());
+ }
+
+ /**
+ * use for every db query, +10 start time and +15 end time(because use task end time to search)
+ * @return
+ */
+ public long getCacheEndTimeBucket() {
+ return TimeBucket.getRecordTimeBucket(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(25));
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
index da8ccee..cb6fa43 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.core.command;
+import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -39,6 +41,11 @@ public class CommandService implements Service {
return new ServiceResetCommand(serialNumber);
}
+ public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
+ final String serialNumber = UUID.randomUUID().toString();
+ return new ProfileTaskCommand(serialNumber, task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
+ }
+
private String generateSerialNumber(final int serviceInstanceId, final long time, final String serviceInstanceUUID) {
return UUID.randomUUID().toString(); // Simply generate a uuid without taking care of the parameters
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskLogRecord.java
new file mode 100644
index 0000000..4ff10f4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskLogRecord.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.core.profile;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_LOG;
+
+/**
+ * profile task log database bean, use record
+ *
+ * @author MrPro
+ */
+@Getter
+@Setter
+@ScopeDeclaration(id = PROFILE_TASK_LOG, name = "ProfileTaskLog")
+@Stream(name = ProfileTaskLogRecord.INDEX_NAME, scopeId = PROFILE_TASK_LOG, builder = ProfileTaskLogRecord.Builder.class, processor = RecordStreamProcessor.class)
+public class ProfileTaskLogRecord extends Record {
+
+ public static final String INDEX_NAME = "profile_task_log";
+ public static final String TASK_ID = "task_id";
+ public static final String INSTANCE_ID = "instance_id";
+ public static final String OPERATION_TYPE = "operation_type";
+ public static final String OPERATION_TIME = "operation_time";
+
+ @Column(columnName = TASK_ID) private String taskId;
+ @Column(columnName = INSTANCE_ID) private int instanceId;
+ @Column(columnName = OPERATION_TYPE) private int operationType;
+ @Column(columnName = OPERATION_TIME) private long operationTime;
+
+ @Override
+ public String id() {
+ return getTaskId() + Const.ID_SPLIT + getInstanceId() + Const.ID_SPLIT + getOperationType() + Const.ID_SPLIT + getOperationTime();
+ }
+
+ public static class Builder implements StorageBuilder<ProfileTaskLogRecord> {
+
+ @Override
+ public ProfileTaskLogRecord map2Data(Map<String, Object> dbMap) {
+ final ProfileTaskLogRecord log = new ProfileTaskLogRecord();
+ log.setTaskId((String)dbMap.get(TASK_ID));
+ log.setInstanceId(((Number)dbMap.get(INSTANCE_ID)).intValue());
+ log.setOperationType(((Number)dbMap.get(OPERATION_TYPE)).intValue());
+ log.setOperationTime(((Number)dbMap.get(OPERATION_TIME)).longValue());
+ log.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ return log;
+ }
+
+ @Override
+ public Map<String, Object> data2Map(ProfileTaskLogRecord storageData) {
+ final HashMap<String, Object> map = new HashMap<>();
+ map.put(TASK_ID, storageData.getTaskId());
+ map.put(INSTANCE_ID, storageData.getInstanceId());
+ map.put(OPERATION_TYPE, storageData.getOperationType());
+ map.put(OPERATION_TIME, storageData.getOperationTime());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ return map;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskMutationService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskMutationService.java
index c295fca..2ba4f67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskMutationService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskMutationService.java
@@ -17,6 +17,7 @@
*/
package org.apache.skywalking.oap.server.core.profile;
+import org.apache.skywalking.apm.network.constants.ProfileConstants;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -60,17 +61,18 @@ public class ProfileTaskMutationService implements Service {
* @param monitorDuration monitor task duration(minute)
* @param minDurationThreshold min duration threshold
* @param dumpPeriod dump period
+ * @param maxSamplingCount max trace count on sniffer
* @return task create result
*/
public ProfileTaskCreationResult createTask(final int serviceId, final String endpointName, final long monitorStartTime, final int monitorDuration,
- final int minDurationThreshold, final int dumpPeriod) throws IOException {
+ final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException {
// calculate task execute range
long taskStartTime = monitorStartTime > 0 ? monitorStartTime : System.currentTimeMillis();
long taskEndTime = taskStartTime + TimeUnit.MINUTES.toMillis(monitorDuration);
// check data
- final String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod);
+ final String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod, maxSamplingCount);
if (errorMessage != null) {
return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
}
@@ -85,6 +87,7 @@ public class ProfileTaskMutationService implements Service {
task.setMinDurationThreshold(minDurationThreshold);
task.setDumpPeriod(dumpPeriod);
task.setCreateTime(createTime);
+ task.setMaxSamplingCount(maxSamplingCount);
task.setTimeBucket(TimeBucket.getRecordTimeBucket(taskEndTime));
NoneStreamingProcessor.getInstance().in(task);
@@ -92,7 +95,7 @@ public class ProfileTaskMutationService implements Service {
}
private String checkDataSuccess(final Integer serviceId, final String endpointName, final long monitorStartTime, final long monitorEndTime, final int monitorDuration,
- final int minDurationThreshold, final int dumpPeriod) throws IOException {
+ final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException {
// basic check
if (serviceId == null) {
return "service cannot be null";
@@ -100,23 +103,27 @@ public class ProfileTaskMutationService implements Service {
if (StringUtil.isEmpty(endpointName)) {
return "endpoint name cannot be empty";
}
- if (monitorEndTime - monitorStartTime < TimeUnit.MINUTES.toMillis(1)) {
- return "monitor duration must greater than 1 minutes";
+ if (monitorDuration < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
+ return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
}
if (minDurationThreshold < 0) {
return "min duration threshold must greater than or equals zero";
}
+ if (maxSamplingCount <= 0) {
+ return "max sampling count must greater than zero";
+ }
// check limit
- // The duration of the monitoring task cannot be greater than 15 minutes
- final long maxMonitorDurationInSec = TimeUnit.MINUTES.toSeconds(15);
- if (monitorDuration > maxMonitorDurationInSec) {
- return "The duration of the monitoring task cannot be greater than 15 minutes";
+ if (monitorDuration > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
+ return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
+ }
+
+ if (dumpPeriod < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
+ return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
}
- // dump period must be greater than or equals 10 milliseconds
- if (dumpPeriod < 10) {
- return "dump period must be greater than or equals 10 milliseconds";
+ if (maxSamplingCount >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
+ return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT;
}
// Each service can monitor up to 1 endpoints during the execution of tasks
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskNoneStream.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskNoneStream.java
index 9e65c6b..446f97f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskNoneStream.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskNoneStream.java
@@ -52,6 +52,7 @@ public class ProfileTaskNoneStream extends NoneStream {
public static final String MIN_DURATION_THRESHOLD = "min_duration_threshold";
public static final String DUMP_PERIOD = "dump_period";
public static final String CREATE_TIME = "create_time";
+ public static final String MAX_SAMPLING_COUNT = "max_sampling_count";
@Override
public String id() {
@@ -65,6 +66,7 @@ public class ProfileTaskNoneStream extends NoneStream {
@Column(columnName = MIN_DURATION_THRESHOLD) private int minDurationThreshold;
@Column(columnName = DUMP_PERIOD) private int dumpPeriod;
@Column(columnName = CREATE_TIME) private long createTime;
+ @Column(columnName = MAX_SAMPLING_COUNT) private int maxSamplingCount;
public static class Builder implements StorageBuilder<ProfileTaskNoneStream> {
@@ -79,6 +81,7 @@ public class ProfileTaskNoneStream extends NoneStream {
record.setDumpPeriod(((Number)dbMap.get(DUMP_PERIOD)).intValue());
record.setCreateTime(((Number)dbMap.get(CREATE_TIME)).longValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ record.setMaxSamplingCount(((Number)dbMap.get(MAX_SAMPLING_COUNT)).intValue());
return record;
}
@@ -93,6 +96,7 @@ public class ProfileTaskNoneStream extends NoneStream {
map.put(DUMP_PERIOD, storageData.getDumpPeriod());
map.put(CREATE_TIME, storageData.getCreateTime());
map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(MAX_SAMPLING_COUNT, storageData.getMaxSamplingCount());
return map;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ProfileTaskQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ProfileTaskQueryService.java
index c111a80..df4ebff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ProfileTaskQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ProfileTaskQueryService.java
@@ -17,18 +17,25 @@
*/
package org.apache.skywalking.oap.server.core.query;
+import com.google.common.base.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import static java.util.Objects.isNull;
@@ -40,14 +47,16 @@ import static java.util.Objects.isNull;
public class ProfileTaskQueryService implements Service {
private final ModuleManager moduleManager;
private IProfileTaskQueryDAO profileTaskQueryDAO;
+ private IProfileTaskLogQueryDAO profileTaskLogQueryDAO;
private ServiceInventoryCache serviceInventoryCache;
+ private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
public ProfileTaskQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IProfileTaskQueryDAO getProfileTaskDAO() {
- if (profileTaskQueryDAO == null) {
+ if (isNull(profileTaskQueryDAO)) {
this.profileTaskQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
}
return profileTaskQueryDAO;
@@ -60,6 +69,20 @@ public class ProfileTaskQueryService implements Service {
return serviceInventoryCache;
}
+ public IProfileTaskLogQueryDAO getProfileTaskLogQueryDAO() {
+ if (isNull(profileTaskLogQueryDAO)) {
+ profileTaskLogQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskLogQueryDAO.class);
+ }
+ return profileTaskLogQueryDAO;
+ }
+
+ public ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
+ if (isNull(serviceInstanceInventoryCache)) {
+ serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
+ }
+ return serviceInstanceInventoryCache;
+ }
+
/**
* search profile task list
* @param serviceId monitor service
@@ -69,14 +92,31 @@ public class ProfileTaskQueryService implements Service {
public List<ProfileTask> getTaskList(Integer serviceId, String endpointName) throws IOException {
final List<ProfileTask> tasks = getProfileTaskDAO().getTaskList(serviceId, endpointName, null, null, null);
+ // query all and filter on task to match logs
+ List<ProfileTaskLog> taskLogList = getProfileTaskLogQueryDAO().getTaskLogList(null);
+ if (taskLogList == null) {
+ taskLogList = Collections.emptyList();
+ }
+
// add service name
if (CollectionUtils.isNotEmpty(tasks)) {
final ServiceInventoryCache serviceInventoryCache = getServiceInventoryCache();
+ final ServiceInstanceInventoryCache serviceInstanceInventoryCache = getServiceInstanceInventoryCache();
for (ProfileTask task : tasks) {
final ServiceInventory serviceInventory = serviceInventoryCache.get(task.getServiceId());
if (serviceInventory != null) {
task.setServiceName(serviceInventory.getName());
}
+
+ // filter all task logs
+ task.setLogs(taskLogList.stream().filter(l -> Objects.equal(l.getTaskId(), task.getId())).map(l -> {
+ // get instance name from cache
+ final ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get(l.getInstanceId());
+ if (instanceInventory != null) {
+ l.setInstanceName(instanceInventory.getName());
+ }
+ return l;
+ }).collect(Collectors.toList()));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
index 2c2bac2..fd5a1b2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
+import java.util.List;
+
/**
* @author MrPro
*/
@@ -35,8 +37,12 @@ public class ProfileTask {
private String serviceName;
private String endpointName;
private long startTime;
+ private long createTime;
private int duration;
private int minDurationThreshold;
private int dumpPeriod;
+ private int maxSamplingCount;
+
+ private List<ProfileTaskLog> logs;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLog.java
similarity index 78%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLog.java
index 2c2bac2..018fdb5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLog.java
@@ -15,12 +15,13 @@
* limitations under the License.
*
*/
-
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
/**
+ * Profile task execute log
+ *
* @author MrPro
*/
@Setter
@@ -28,15 +29,17 @@ import lombok.*;
@NoArgsConstructor
@AllArgsConstructor
@Builder
-public class ProfileTask {
+public class ProfileTaskLog {
private String id;
- private int serviceId;
- private String serviceName;
- private String endpointName;
- private long startTime;
- private int duration;
- private int minDurationThreshold;
- private int dumpPeriod;
+ private String taskId;
+
+ // instance
+ private int instanceId;
+ private String instanceName;
+
+ // operation
+ private ProfileTaskLogOperationType operationType;
+ private long operationTime;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLogOperationType.java
similarity index 50%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLogOperationType.java
index 2c2bac2..678492b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTask.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/ProfileTaskLogOperationType.java
@@ -15,28 +15,47 @@
* limitations under the License.
*
*/
-
package org.apache.skywalking.oap.server.core.query.entity;
-import lombok.*;
+import java.util.HashMap;
+import java.util.Map;
/**
+ * Profile task log operation type
+ *
* @author MrPro
*/
-@Setter
-@Getter
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-public class ProfileTask {
-
- private String id;
- private int serviceId;
- private String serviceName;
- private String endpointName;
- private long startTime;
- private int duration;
- private int minDurationThreshold;
- private int dumpPeriod;
+public enum ProfileTaskLogOperationType {
+
+ // when sniffer has notified
+ NOTIFIED(1),
+ // when sniffer has execution finished to report
+ EXECUTION_FINISHED(2);
+
+ private int code;
+ private static final Map<Integer, ProfileTaskLogOperationType> CACHE = new HashMap<Integer, ProfileTaskLogOperationType>();
+
+ static {
+ for (ProfileTaskLogOperationType val :ProfileTaskLogOperationType.values()) {
+ CACHE.put(val.getCode(), val);
+ }
+ }
+
+ /**
+ * Parse opetation type by code
+ * @param code
+ * @return
+ */
+ public static ProfileTaskLogOperationType parse(int code) {
+ return CACHE.get(code);
+ }
+
+ ProfileTaskLogOperationType(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return this.code;
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 3187cb6..54f9dfc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -68,6 +68,7 @@ public class DefaultScopeDefine {
public static final int JAEGER_SPAN = 24;
public static final int HTTP_ACCESS_LOG = 25;
public static final int PROFILE_TASK = 26;
+ public static final int PROFILE_TASK_LOG = 27;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index e5a1cee..1ae6d54 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.*;
@@ -41,6 +42,6 @@ public class StorageModule extends ModuleDefine {
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class, IMetricsQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
- ITopNRecordsQueryDAO.class, ILogQueryDAO.class, IProfileTaskQueryDAO.class};
+ ITopNRecordsQueryDAO.class, ILogQueryDAO.class, IProfileTaskQueryDAO.class, IProfileTaskLogQueryDAO.class};
}
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskLogQueryDAO.java
similarity index 60%
copy from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskLogQueryDAO.java
index 78323c9..98110f7 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskLogQueryDAO.java
@@ -16,27 +16,26 @@
*
*/
-package org.apache.skywalking.e2e.profile.creation;
+package org.apache.skywalking.oap.server.core.storage.profile;
-import lombok.*;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.storage.DAO;
+
+import java.io.IOException;
+import java.util.List;
/**
- * e2e profile, create profile task entity
+ * process all profile task log query
*
* @author MrPro
*/
-@Setter
-@Getter
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-public class ProfileTaskCreationRequest {
+public interface IProfileTaskLogQueryDAO extends DAO {
- private int serviceId;
- private String endpointName;
- private long startTime;
- private int duration;
- private int minDurationThreshold;
- private int dumpPeriod;
+ /**
+ * search all task log list in appoint profile task id
+ * @param taskId profile task id, maybe null
+ * @return
+ */
+ List<ProfileTaskLog> getTaskLogList(final String taskId) throws IOException;
}
diff --git a/oap-server/server-library/library-util/src/test/resources/application.yml b/oap-server/server-library/library-util/src/test/resources/application.yml
index 613ff11..a95e875 100755
--- a/oap-server/server-library/library-util/src/test/resources/application.yml
+++ b/oap-server/server-library/library-util/src/test/resources/application.yml
@@ -66,6 +66,8 @@ receiver-trace:
bufferFileCleanWhenRestart: ${RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
receiver-jvm:
default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: ${SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ProfileMutation.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ProfileMutation.java
index 659344d..b9b861b 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ProfileMutation.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ProfileMutation.java
@@ -54,7 +54,8 @@ public class ProfileMutation implements GraphQLMutationResolver {
creationRequest.getStartTime() == null ? -1 : creationRequest.getStartTime(),
creationRequest.getDuration(),
creationRequest.getMinDurationThreshold(),
- creationRequest.getDumpPeriod()
+ creationRequest.getDumpPeriod(),
+ creationRequest.getMaxSamplingCount()
);
}
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/ProfileTaskCreationRequest.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/ProfileTaskCreationRequest.java
index 370f120..e7dec26 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/ProfileTaskCreationRequest.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/ProfileTaskCreationRequest.java
@@ -37,5 +37,6 @@ public class ProfileTaskCreationRequest {
private Step durationUnit;
private int minDurationThreshold;
private int dumpPeriod;
+ private int maxSamplingCount;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index a9b4813..dde9a0d 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit a9b48130626e4b4dcf46bb8268c7125cc6f50814
+Subproject commit dde9a0dad56617ccbf4226f5f71e667fd9620222
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index c86d616..33657f6 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -40,6 +40,7 @@
<module>jaeger-receiver-plugin</module>
<module>receiver-proto</module>
<module>skywalking-so11y-receiver-plugin</module>
+ <module>skywalking-profile-receiver-plugin</module>
</modules>
<dependencies>
diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/pom.xml
new file mode 100644
index 0000000..2b1e510
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>server-receiver-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>7.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>skywalking-profile-receiver-plugin</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-sharing-server-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/module/ProfileModule.java
similarity index 67%
copy from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
copy to oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/module/ProfileModule.java
index 78323c9..171e0c9 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/module/ProfileModule.java
@@ -16,27 +16,25 @@
*
*/
-package org.apache.skywalking.e2e.profile.creation;
+package org.apache.skywalking.oap.server.receiver.profile.module;
-import lombok.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
- * e2e profile, create profile task entity
+ * profile task receiver
*
* @author MrPro
*/
-@Setter
-@Getter
-@NoArgsConstructor
-@AllArgsConstructor
-@Builder
-public class ProfileTaskCreationRequest {
+public class ProfileModule extends ModuleDefine {
- private int serviceId;
- private String endpointName;
- private long startTime;
- private int duration;
- private int minDurationThreshold;
- private int dumpPeriod;
+ public static final String NAME = "receiver-profile";
+ public ProfileModule() {
+ super(NAME);
+ }
+
+ @Override
+ public Class[] services() {
+ return new Class[0];
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/ProfileModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/ProfileModuleProvider.java
new file mode 100644
index 0000000..7936e43
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/ProfileModuleProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.profile.provider;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.receiver.profile.module.ProfileModule;
+import org.apache.skywalking.oap.server.receiver.profile.provider.handler.ProfileTaskServiceHandler;
+import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
+
+/**
+ * profile task receiver default provider
+ *
+ * @author MrPro
+ */
+public class ProfileModuleProvider extends ModuleProvider {
+ @Override
+ public String name() {
+ return "default";
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return ProfileModule.class;
+ }
+
+ @Override
+ public ModuleConfig createConfigBeanIfAbsent() {
+ return null;
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
+ grpcHandlerRegister.addHandler(new ProfileTaskServiceHandler(getManager()));
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {CoreModule.NAME, SharingServerModule.NAME};
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
new file mode 100644
index 0000000..0511808
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.profile.provider.handler;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
+import org.apache.skywalking.oap.server.core.command.CommandService;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author MrPro
+ */
+public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler {
+
+ private ProfileTaskCache profileTaskCache;
+ private final CommandService commandService;
+
+ public ProfileTaskServiceHandler(ModuleManager moduleManager) {
+ this.profileTaskCache = moduleManager.find(CoreModule.NAME).provider().getService(ProfileTaskCache.class);
+ this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class);
+ }
+
+ @Override
+ public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver) {
+ // query profile task list by service id
+ final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(request.getServiceId());
+ if (CollectionUtils.isEmpty(profileTaskList)) {
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ return;
+ }
+
+ // build command list
+ final Commands.Builder commandsBuilder = Commands.newBuilder();
+ final long lastCommandTime = request.getLastCommandTime();
+
+ for (ProfileTask profileTask : profileTaskList) {
+ // if command create time less than last command time, means sniffer already have task
+ if (profileTask.getCreateTime() <= lastCommandTime) {
+ continue;
+ }
+
+ // record profile task log
+ recordProfileTaskLog(profileTask, request);
+
+ // add command
+ commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
+ }
+
+ responseObserver.onNext(commandsBuilder.build());
+ responseObserver.onCompleted();
+ }
+
+ private void recordProfileTaskLog(ProfileTask task, ProfileTaskCommandQuery query) {
+ final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord();
+ logRecord.setTaskId(task.getId());
+ logRecord.setInstanceId(query.getInstanceId());
+ logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode());
+ logRecord.setOperationTime(System.currentTimeMillis());
+ // same with task time bucket, ensure record will ttl same with profile task
+ logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration())));
+
+ RecordStreamProcessor.getInstance().in(logRecord);
+ }
+
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
similarity index 54%
copy from apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
copy to oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
index 7dbc784..e3b5695 100644
--- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
@@ -16,13 +16,4 @@
#
#
-org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
-org.apache.skywalking.apm.agent.core.context.ContextManager
-org.apache.skywalking.apm.agent.core.sampling.SamplingService
-org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
-org.apache.skywalking.apm.agent.core.jvm.JVMService
-org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
-org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
-org.apache.skywalking.apm.agent.core.commands.CommandService
-org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
-org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
+org.apache.skywalking.oap.server.receiver.profile.module.ProfileModule
diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
similarity index 54%
copy from apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
copy to oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
index 7dbc784..198287a 100644
--- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -16,13 +16,4 @@
#
#
-org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
-org.apache.skywalking.apm.agent.core.context.ContextManager
-org.apache.skywalking.apm.agent.core.sampling.SamplingService
-org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
-org.apache.skywalking.apm.agent.core.jvm.JVMService
-org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
-org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
-org.apache.skywalking.apm.agent.core.commands.CommandService
-org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
-org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
+org.apache.skywalking.oap.server.receiver.profile.provider.ProfileModuleProvider
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 3a9dd34..e451ac6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
@@ -119,6 +120,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config.getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config.getProfileTaskQueryMaxSize()));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskLogEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskLogEsDAO.java
new file mode 100644
index 0000000..0f0a230
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskLogEsDAO.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author MrPro
+ */
+public class ProfileTaskLogEsDAO extends EsDAO implements IProfileTaskLogQueryDAO {
+ private final int queryMaxSize;
+
+ public ProfileTaskLogEsDAO(ElasticSearchClient client, int profileTaskQueryMaxSize) {
+ super(client);
+ // query log size use profile task query max size * per log count
+ this.queryMaxSize = profileTaskQueryMaxSize * 50;
+ }
+
+ @Override
+ public List<ProfileTaskLog> getTaskLogList(String taskId) throws IOException {
+ final SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ sourceBuilder.query(boolQueryBuilder);
+
+ if (taskId != null) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskLogRecord.TASK_ID, taskId));
+ }
+
+ sourceBuilder.sort(ProfileTaskLogRecord.OPERATION_TIME, SortOrder.DESC);
+ sourceBuilder.size(queryMaxSize);
+
+ final SearchResponse response = getClient().search(ProfileTaskLogRecord.INDEX_NAME, sourceBuilder);
+
+ final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ tasks.add(parseTaskLog(searchHit));
+ }
+
+ return tasks;
+ }
+
+ private ProfileTaskLog parseTaskLog(SearchHit data) {
+ return ProfileTaskLog.builder()
+ .id(data.getId())
+ .taskId((String) data.getSourceAsMap().get(ProfileTaskLogRecord.TASK_ID))
+ .instanceId(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.INSTANCE_ID)).intValue())
+ .operationType(ProfileTaskLogOperationType.parse(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+ .operationTime(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.OPERATION_TIME)).longValue()).build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
index c847f54..d7598cf 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
@@ -94,8 +94,10 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
.serviceId(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.SERVICE_ID)).intValue())
.endpointName((String) data.getSourceAsMap().get(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.START_TIME)).longValue())
+ .createTime(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.CREATE_TIME)).longValue())
.duration(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DURATION)).intValue())
.minDurationThreshold(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD)).intValue())
- .dumpPeriod(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DUMP_PERIOD)).intValue()).build();
+ .dumpPeriod(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DUMP_PERIOD)).intValue())
+ .maxSamplingCount(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.MAX_SAMPLING_COUNT)).intValue()).build();
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index e04aa1a..5fe5775 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
@@ -47,6 +48,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
@@ -124,6 +126,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 4663ad2..0562648 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.ttl.GeneralStorageTTL;
@@ -92,6 +93,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
+ this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskLogQueryDAO.java
new file mode 100644
index 0000000..27d9d4e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskLogQueryDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author MrPro
+ */
+public class H2ProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+ private JDBCHikariCPClient h2Client;
+
+ public H2ProfileTaskLogQueryDAO(JDBCHikariCPClient h2Client) {
+ this.h2Client = h2Client;
+ }
+
+ @Override
+ public List<ProfileTaskLog> getTaskLogList(String taskId) throws IOException {
+ final StringBuilder sql = new StringBuilder();
+ final ArrayList<Object> condition = new ArrayList<>(1);
+ sql.append("select * from ").append(ProfileTaskLogRecord.INDEX_NAME).append(" where 1=1 ");
+
+ if (taskId != null) {
+ sql.append(" and ").append(ProfileTaskLogRecord.TASK_ID).append(" = ?");
+ }
+
+ sql.append("ORDER BY ").append(ProfileTaskLogRecord.OPERATION_TIME).append(" DESC ");
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
+ final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
+ while (resultSet.next()) {
+ tasks.add(parseLog(resultSet));
+ }
+ return tasks;
+ }
+ } catch (SQLException | JDBCClientException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private ProfileTaskLog parseLog(ResultSet data) throws SQLException {
+ return ProfileTaskLog.builder()
+ .id(data.getString("id"))
+ .taskId(data.getString(ProfileTaskLogRecord.TASK_ID))
+ .instanceId(data.getInt(ProfileTaskLogRecord.INSTANCE_ID))
+ .operationType(ProfileTaskLogOperationType.parse(data.getInt(ProfileTaskLogRecord.OPERATION_TYPE)))
+ .operationTime(data.getLong(ProfileTaskLogRecord.OPERATION_TIME)).build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
index fab463d..a526b49 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
@@ -98,8 +98,10 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
.serviceId(data.getInt(ProfileTaskNoneStream.SERVICE_ID))
.endpointName(data.getString(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(data.getLong(ProfileTaskNoneStream.START_TIME))
+ .createTime(data.getLong(ProfileTaskNoneStream.CREATE_TIME))
.duration(data.getInt(ProfileTaskNoneStream.DURATION))
.minDurationThreshold(data.getInt(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD))
- .dumpPeriod(data.getInt(ProfileTaskNoneStream.DUMP_PERIOD)).build();
+ .dumpPeriod(data.getInt(ProfileTaskNoneStream.DUMP_PERIOD))
+ .maxSamplingCount(data.getInt(ProfileTaskNoneStream.MAX_SAMPLING_COUNT)).build();
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 056e023..0fb4e3c 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
@@ -107,6 +108,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/test/e2e/e2e-mysql/src/docker/application.yml b/test/e2e/e2e-mysql/src/docker/application.yml
index 343c5cf..739246c 100644
--- a/test/e2e/e2e-mysql/src/docker/application.yml
+++ b/test/e2e/e2e-mysql/src/docker/application.yml
@@ -126,6 +126,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/test/e2e/e2e-profile/e2e-profile-es-test-runner/pom.xml b/test/e2e/e2e-profile/e2e-profile-es-test-runner/pom.xml
index 00de586..eacc182 100644
--- a/test/e2e/e2e-profile/e2e-profile-es-test-runner/pom.xml
+++ b/test/e2e/e2e-profile/e2e-profile-es-test-runner/pom.xml
@@ -116,6 +116,7 @@
</INSTRUMENTED_SERVICE_1>
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
+ -DSW_AGENT_PROFILE_ACTIVE=true
-DSW_AGENT_NAME=${provider.name}
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
diff --git a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index c6ded0f..528c490 100644
--- a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
- private final int retryInterval = 30;
+ private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
@@ -74,10 +75,11 @@ public class ProfileVerificationITCase {
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
-// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
+ // final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
+ // final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
@@ -135,7 +137,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
- .dumpPeriod(10).build();
+ .dumpPeriod(10)
+ .maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
@@ -144,19 +147,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
- // verify get task list
- final ProfileTasks tasks = profileClient.getProfileTaskList(
- new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
- .endpointName("")
- );
- LOGGER.info("get profile task list: {}", tasks);
+ // verify get task list and logs
+ for (int i = 0; i < 10; i++) {
+ try {
+ final ProfileTasks tasks = profileClient.getProfileTaskList(
+ new ProfileTaskQuery()
+ .serviceId(creationRequest.getServiceId())
+ .endpointName("")
+ );
+ LOGGER.info("get profile task list: {}", tasks);
- InputStream expectedInputStream =
- new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ InputStream expectedInputStream =
+ new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+
+ final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
+ servicesMatcher.verify(tasks);
+ break;
+ } catch (Throwable e) {
+ if (i == 10 - 1) {
+ throw new IllegalStateException("match profile task list fail!", e);
+ }
+ TimeUnit.SECONDS.sleep(retryInterval);
+ }
+ }
- final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
- servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
diff --git a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/pom.xml b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/pom.xml
index 993eb1d..eb8a3d5 100644
--- a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/pom.xml
+++ b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/pom.xml
@@ -75,6 +75,7 @@
</INSTRUMENTED_SERVICE_1>
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
+ -DSW_AGENT_PROFILE_ACTIVE=true
-DSW_AGENT_NAME=${provider.name}
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
diff --git a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index 8fa6128..cab6dff 100644
--- a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
- private final int retryInterval = 30;
+ private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
@@ -78,6 +79,7 @@ public class ProfileVerificationITCase {
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
+ // final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
@@ -134,7 +136,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
- .dumpPeriod(10).build();
+ .dumpPeriod(10)
+ .maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
@@ -143,19 +146,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
- // verify get task list
- final ProfileTasks tasks = profileClient.getProfileTaskList(
- new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
- .endpointName("")
- );
- LOGGER.info("get profile task list: {}", tasks);
+ // verify get task list and logs
+ for (int i = 0; i < 10; i++) {
+ try {
+ final ProfileTasks tasks = profileClient.getProfileTaskList(
+ new ProfileTaskQuery()
+ .serviceId(creationRequest.getServiceId())
+ .endpointName("")
+ );
+ LOGGER.info("get profile task list: {}", tasks);
- InputStream expectedInputStream =
- new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ InputStream expectedInputStream =
+ new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+
+ final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
+ servicesMatcher.verify(tasks);
+ break;
+ } catch (Throwable e) {
+ if (i == 10 - 1) {
+ throw new IllegalStateException("match profile task list fail!", e);
+ }
+ TimeUnit.SECONDS.sleep(retryInterval);
+ }
+ }
- final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
- servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
diff --git a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/pom.xml b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/pom.xml
index a1901bc..b569e4f 100644
--- a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/pom.xml
+++ b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/pom.xml
@@ -95,6 +95,7 @@
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
-DSW_AGENT_NAME=${provider.name}
+ -DSW_AGENT_PROFILE_ACTIVE=true
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
</env>
diff --git a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/docker/application.yml b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/docker/application.yml
index 343c5cf..739246c 100644
--- a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/docker/application.yml
+++ b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/docker/application.yml
@@ -126,6 +126,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
+receiver-profile:
+ default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
diff --git a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index a82b47b..cab6dff 100644
--- a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
- private final int retryInterval = 30;
+ private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
@@ -74,10 +75,11 @@ public class ProfileVerificationITCase {
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
-// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
+ // final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
+ // final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
@@ -134,7 +136,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
- .dumpPeriod(10).build();
+ .dumpPeriod(10)
+ .maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
@@ -143,19 +146,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
- // verify get task list
- final ProfileTasks tasks = profileClient.getProfileTaskList(
- new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
- .endpointName("")
- );
- LOGGER.info("get profile task list: {}", tasks);
+ // verify get task list and logs
+ for (int i = 0; i < 10; i++) {
+ try {
+ final ProfileTasks tasks = profileClient.getProfileTaskList(
+ new ProfileTaskQuery()
+ .serviceId(creationRequest.getServiceId())
+ .endpointName("")
+ );
+ LOGGER.info("get profile task list: {}", tasks);
- InputStream expectedInputStream =
- new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ InputStream expectedInputStream =
+ new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+
+ final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
+ servicesMatcher.verify(tasks);
+ break;
+ } catch (Throwable e) {
+ if (i == 10 - 1) {
+ throw new IllegalStateException("match profile task list fail!", e);
+ }
+ TimeUnit.SECONDS.sleep(retryInterval);
+ }
+ }
- final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
- servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/ProfileClient.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/ProfileClient.java
index bb53584..d01c0a3 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/ProfileClient.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/ProfileClient.java
@@ -59,7 +59,8 @@ public class ProfileClient extends SimpleQueryClient {
.replace("{duration}", String.valueOf(creationRequest.getDuration()))
.replace("{startTime}", String.valueOf(creationRequest.getStartTime()))
.replace("{minDurationThreshold}", String.valueOf(creationRequest.getMinDurationThreshold()))
- .replace("{dumpPeriod}", String.valueOf(creationRequest.getDumpPeriod()));
+ .replace("{dumpPeriod}", String.valueOf(creationRequest.getDumpPeriod()))
+ .replace("{maxSamplingCount}", String.valueOf(creationRequest.getMaxSamplingCount()));
final ResponseEntity<GQLResponse<ProfileTaskCreationResultWrapper>> responseEntity = restTemplate.exchange(
new RequestEntity<>(queryString, HttpMethod.POST, URI.create(endpointUrl)),
new ParameterizedTypeReference<GQLResponse<ProfileTaskCreationResultWrapper>>() {
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
index 78323c9..3b82c07 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/creation/ProfileTaskCreationRequest.java
@@ -38,5 +38,6 @@ public class ProfileTaskCreationRequest {
private int duration;
private int minDurationThreshold;
private int dumpPeriod;
+ private int maxSamplingCount;
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
index 3485124..efee714 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
@@ -21,6 +21,8 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import java.util.List;
+
/**
* Profile task bean for e2e GraphQL test result
*
@@ -38,5 +40,8 @@ public class ProfileTask {
private String duration;
private String minDurationThreshold;
private String dumpPeriod;
+ private String maxSamplingCount;
+
+ private List<ProfileTaskLog> logs;
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLog.java
similarity index 78%
copy from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
copy to test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLog.java
index 3485124..4c4b0b3 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLog.java
@@ -15,6 +15,7 @@
* limitations under the License.
*
*/
+
package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
@@ -22,21 +23,16 @@ import lombok.Setter;
import lombok.ToString;
/**
- * Profile task bean for e2e GraphQL test result
- *
* @author MrPro
*/
@Setter
@Getter
@ToString
-public class ProfileTask {
+public class ProfileTaskLog {
private String id;
- private String serviceId;
- private String endpointName;
- private String startTime;
- private String duration;
- private String minDurationThreshold;
- private String dumpPeriod;
+ private String instanceId;
+ private String operationType;
+ private String operationTime;
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLogMatcher.java
similarity index 63%
copy from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
copy to test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLogMatcher.java
index 3485124..591ffa7 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTask.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskLogMatcher.java
@@ -15,28 +15,30 @@
* limitations under the License.
*
*/
+
package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
import lombok.Setter;
-import lombok.ToString;
+import org.apache.skywalking.e2e.verification.AbstractMatcher;
/**
- * Profile task bean for e2e GraphQL test result
- *
* @author MrPro
*/
@Setter
@Getter
-@ToString
-public class ProfileTask {
+public class ProfileTaskLogMatcher extends AbstractMatcher<ProfileTaskLog> {
private String id;
- private String serviceId;
- private String endpointName;
- private String startTime;
- private String duration;
- private String minDurationThreshold;
- private String dumpPeriod;
+ private String instanceId;
+ private String operationType;
+ private String operationTime;
+ @Override
+ public void verify(ProfileTaskLog profileTaskLog) {
+ doVerify(id, profileTaskLog.getId());
+ doVerify(instanceId, profileTaskLog.getInstanceId());
+ doVerify(operationType, profileTaskLog.getOperationType());
+ doVerify(operationTime, profileTaskLog.getOperationTime());
+ }
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskMatcher.java b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskMatcher.java
index f5c420c..17c2c3d 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskMatcher.java
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/java/org/apache/skywalking/e2e/profile/query/ProfileTaskMatcher.java
@@ -21,6 +21,9 @@ package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.e2e.verification.AbstractMatcher;
+import org.assertj.core.api.Assertions;
+
+import java.util.List;
/**
* @author MrPro
@@ -36,6 +39,9 @@ public class ProfileTaskMatcher extends AbstractMatcher<ProfileTask> {
private String duration;
private String minDurationThreshold;
private String dumpPeriod;
+ private String maxSamplingCount;
+
+ private List<ProfileTaskLogMatcher> logs;
@Override
public void verify(ProfileTask task) {
@@ -46,6 +52,13 @@ public class ProfileTaskMatcher extends AbstractMatcher<ProfileTask> {
doVerify(duration, task.getDuration());
doVerify(minDurationThreshold, task.getMinDurationThreshold());
doVerify(dumpPeriod, task.getDumpPeriod());
+
+ // verify logs
+ Assertions.assertThat(task.getLogs()).hasSameSizeAs(this.logs);
+ int size = this.getLogs().size();
+ for (int i = 0; i < size; i++) {
+ this.getLogs().get(i).verify(task.getLogs().get(i));
+ }
}
}
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
index 67afb7b..2aac5b2 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
@@ -22,3 +22,9 @@ tasks:
duration: gt 0
minDurationThreshold: gt 0
dumpPeriod: gt 0
+ maxSamplingCount: gt 0
+ logs:
+ - id: not null
+ instanceId: gt 0
+ operationType: NOTIFIED
+ operationTime: gt 0
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/getProfileTaskList.gql b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/getProfileTaskList.gql
index c112924..4ef3f7c 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/getProfileTaskList.gql
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/getProfileTaskList.gql
@@ -24,6 +24,13 @@
duration: duration
minDurationThreshold: minDurationThreshold
dumpPeriod: dumpPeriod
+ maxSamplingCount: maxSamplingCount
+ logs {
+ id
+ instanceId
+ operationType
+ operationTime
+ }
}
}",
"variables": {
diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/profileTaskCreation.gql b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/profileTaskCreation.gql
index 8aa5028..5f92b95 100644
--- a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/profileTaskCreation.gql
+++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/profileTaskCreation.gql
@@ -28,7 +28,8 @@
"duration": {duration},
"startTime": {startTime},
"minDurationThreshold": {minDurationThreshold},
- "dumpPeriod": {dumpPeriod}
+ "dumpPeriod": {dumpPeriod},
+ "maxSamplingCount": {maxSamplingCount}
}
}
}