You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/13 23:30:53 UTC
[skywalking] branch master updated: Update the eBPF Profiling task as service level (#8840)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new db3c1da10d Update the eBPF Profiling task as service level (#8840)
db3c1da10d is described below
commit db3c1da10d448113bbfd29a69b1d4ad77eda5fe6
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Apr 14 07:30:40 2022 +0800
Update the eBPF Profiling task as service level (#8840)
---
apm-protocol/apm-network/src/main/proto | 2 +-
docs/en/changes/changes.md | 2 +
.../analysis/manual/process/ProcessDispatcher.java | 10 ++
...Dispatcher.java => ProcessLabelDispatcher.java} | 31 ++---
.../analysis/manual/process/ProcessTraffic.java | 13 +++
.../manual/process/ServiceLabelRecord.java | 128 +++++++++++++++++++++
.../oap/server/core/command/CommandService.java | 4 +-
.../ebpf/EBPFProfilingMutationService.java | 76 ++++++------
.../profiling/ebpf/EBPFProfilingQueryService.java | 73 ++++++++++--
.../ebpf/analyze/EBPFProfilingAnalyzer.java | 4 +-
.../ebpf/storage/EBPFProfilingTaskRecord.java | 33 ++----
.../server/core/query/MetadataQueryService.java | 27 ++++-
.../query/input/EBPFProfilingTaskCondition.java | 37 ------
.../EBPFProfilingTaskFixedTimeCreationRequest.java | 8 +-
.../query/type/EBPFProfilingProcessFinderType.java | 63 ----------
.../server/core/query/type/EBPFProfilingTask.java | 8 +-
.../EBPFProfilingTaskPrepare.java} | 17 +--
.../oap/server/core/query/type/Process.java | 2 +
.../oap/server/core/query/type/event/Event.java | 3 +
.../core/query/type/event/EventQueryCondition.java | 2 +
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../skywalking/oap/server/core/source/Process.java | 5 +
.../source/{Process.java => ServiceLabel.java} | 40 ++-----
.../oap/server/core/storage/StorageModule.java | 4 +-
.../profiling/ebpf/IEBPFProfilingDataDAO.java | 4 +-
.../profiling/ebpf/IEBPFProfilingTaskDAO.java | 4 +-
...ingProcessFinder.java => IServiceLabelDAO.java} | 25 ++--
.../core/storage/query/IMetadataQueryDAO.java | 13 ++-
.../ebpf/analyze/EBPFProfilingAnalyzeContext.java | 2 +-
.../resolver/EBPFProcessProfilingQuery.java | 22 ++--
.../query/graphql/resolver/MetadataQueryV2.java | 8 +-
.../src/main/resources/query-protocol | 2 +-
.../handler/EBPFProcessServiceHandler.java | 27 +++++
.../handler/EBPFProfilingServiceHandler.java | 31 ++++-
.../StorageModuleElasticsearchProvider.java | 4 +
.../query/EBPFProfilingDataEsDAO.java | 4 +-
.../query/EBPFProfilingTaskEsDAO.java | 59 +++++-----
.../elasticsearch/query/MetadataQueryEsDAO.java | 49 +++++++-
.../elasticsearch/query/ServiceLabelEsDAO.java | 60 ++++++++++
.../plugin/influxdb/InfluxStorageProvider.java | 3 +
.../influxdb/query/EBPFProfilingDataQuery.java | 7 +-
.../influxdb/query/EBPFProfilingTaskQuery.java | 51 +++-----
.../plugin/influxdb/query/MetadataQuery.java | 41 ++++++-
.../plugin/influxdb/query/ServiceLabelQuery.java | 68 +++++++++++
.../storage/plugin/iotdb/IoTDBStorageProvider.java | 3 +
.../iotdb/query/IoTDBEBPFProfilingDataDAO.java | 7 +-
.../iotdb/query/IoTDBEBPFProfilingTaskDAO.java | 42 +++----
.../plugin/iotdb/query/IoTDBMetadataQueryDAO.java | 49 +++++++-
...DataDAO.java => IoTDBServiceLabelQueryDAO.java} | 42 +++----
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 3 +
.../plugin/jdbc/h2/dao/H2EBPFProfilingDataDAO.java | 21 +++-
.../plugin/jdbc/h2/dao/H2EBPFProfilingTaskDAO.java | 38 ++----
.../plugin/jdbc/h2/dao/H2MetadataQueryDAO.java | 68 +++++++++--
.../plugin/jdbc/h2/dao/H2ServiceLabelQueryDAO.java | 62 ++++++++++
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 3 +
.../jdbc/postgresql/PostgreSQLStorageProvider.java | 3 +
.../plugin/jdbc/tidb/TiDBStorageProvider.java | 3 +
pom.xml | 1 +
.../e2e-v2/cases/profiling/ebpf/docker-compose.yml | 1 +
...ng-task-list.yml => process-estimate-scale.yml} | 16 +--
.../cases/profiling/ebpf/expected/process.yml | 5 +
...-task-list.yml => profiling-create-prepare.yml} | 21 +---
.../ebpf/expected/profiling-schedule-list.yml | 5 +
.../ebpf/expected/profiling-task-list.yml | 10 +-
.../cases/profiling/ebpf/profiling-cases.yaml | 26 ++---
test/e2e-v2/script/env | 4 +-
66 files changed, 995 insertions(+), 515 deletions(-)
diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto
index 6882545998..2981c652ca 160000
--- a/apm-protocol/apm-network/src/main/proto
+++ b/apm-protocol/apm-network/src/main/proto
@@ -1 +1 @@
-Subproject commit 68825459980d14702de8fb2303c7997624129991
+Subproject commit 2981c652ca6c5125d71bb104dbe441abe3d8a2d4
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 92ce648544..a16e14829b 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -13,6 +13,8 @@
* Support BanyanDB global index for entities. Log and Segment record entities declare this new feature.
* Remove unnecessary analyzer settings in columns of templates. Many were added due to analyzer's default value.
* Simplify the Kafka Fetch configuration in cluster mode.
+* [Breaking Change] Update the eBPF Profiling task to the service level,
+ please delete index/table: `ebpf_profiling_task`, `process_traffic`.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java
index 043338fa3b..5ffb3c7a7b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java
@@ -18,11 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.manual.process;
+import com.google.gson.Gson;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.Process;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
public class ProcessDispatcher implements SourceDispatcher<Process> {
+ private static final Gson GSON = new Gson();
+
@Override
public void dispatch(Process source) {
final ProcessTraffic traffic = new ProcessTraffic();
@@ -30,6 +35,11 @@ public class ProcessDispatcher implements SourceDispatcher<Process> {
traffic.setInstanceId(source.getInstanceId());
traffic.setName(source.getName());
traffic.setLayer(source.getLayer().value());
+ if (CollectionUtils.isNotEmpty(source.getLabels())) {
+ traffic.setLabelsJson(GSON.toJson(source.getLabels()));
+ } else {
+ traffic.setLabelsJson(Const.EMPTY_STRING);
+ }
traffic.setAgentId(source.getAgentId());
traffic.setProperties(source.getProperties());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessLabelDispatcher.java
similarity index 52%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessLabelDispatcher.java
index 043338fa3b..ae812657c5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessLabelDispatcher.java
@@ -6,39 +6,28 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.skywalking.oap.server.core.analysis.manual.process;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
-import org.apache.skywalking.oap.server.core.source.Process;
+import org.apache.skywalking.oap.server.core.source.ServiceLabel;
-public class ProcessDispatcher implements SourceDispatcher<Process> {
+public class ProcessLabelDispatcher implements SourceDispatcher<ServiceLabel> {
@Override
- public void dispatch(Process source) {
- final ProcessTraffic traffic = new ProcessTraffic();
- traffic.setServiceId(source.getServiceId());
- traffic.setInstanceId(source.getInstanceId());
- traffic.setName(source.getName());
- traffic.setLayer(source.getLayer().value());
-
- traffic.setAgentId(source.getAgentId());
- traffic.setProperties(source.getProperties());
- if (source.getDetectType() != null) {
- traffic.setDetectType(source.getDetectType().value());
- }
-
- traffic.setTimeBucket(source.getTimeBucket());
- traffic.setLastPingTimestamp(source.getTimeBucket());
- MetricsStreamProcessor.getInstance().in(traffic);
+ public void dispatch(ServiceLabel source) {
+ final ServiceLabelRecord record = new ServiceLabelRecord();
+ record.setServiceId(source.getServiceId());
+ record.setLabel(source.getLabel());
+ record.setTimeBucket(source.getTimeBucket());
+ MetricsStreamProcessor.getInstance().in(record);
}
-}
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
index 0380e1ff45..51cc77aa58 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
@@ -56,6 +56,7 @@ public class ProcessTraffic extends Metrics {
public static final String PROPERTIES = "properties";
public static final String LAST_PING_TIME_BUCKET = "last_ping";
public static final String DETECT_TYPE = "detect_type";
+ public static final String LABELS_JSON = "labels_json";
private static final Gson GSON = new Gson();
@@ -103,6 +104,11 @@ public class ProcessTraffic extends Metrics {
@Column(columnName = PROPERTIES, storageOnly = true, length = 50000)
private JsonObject properties;
+ @Setter
+ @Getter
+ @Column(columnName = LABELS_JSON, storageOnly = true, length = 500)
+ private String labelsJson;
+
@Override
public boolean combine(Metrics metrics) {
final ProcessTraffic processTraffic = (ProcessTraffic) metrics;
@@ -116,6 +122,9 @@ public class ProcessTraffic extends Metrics {
if (processTraffic.getDetectType() > 0) {
this.detectType = processTraffic.getDetectType();
}
+ if (StringUtil.isNotEmpty(processTraffic.getLabelsJson())) {
+ this.labelsJson = processTraffic.getLabelsJson();
+ }
return true;
}
@@ -135,6 +144,7 @@ public class ProcessTraffic extends Metrics {
if (StringUtil.isNotEmpty(propString)) {
setProperties(GSON.fromJson(propString, JsonObject.class));
}
+ setLabelsJson(remoteData.getDataStrings(5));
setLastPingTimestamp(remoteData.getDataLongs(0));
setDetectType(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(1));
@@ -153,6 +163,7 @@ public class ProcessTraffic extends Metrics {
} else {
builder.addDataStrings(GSON.toJson(properties));
}
+ builder.addDataStrings(labelsJson);
builder.addDataLongs(lastPingTimestamp);
builder.addDataIntegers(detectType);
builder.addDataLongs(getTimeBucket());
@@ -180,6 +191,7 @@ public class ProcessTraffic extends Metrics {
if (StringUtil.isNotEmpty(propString)) {
processTraffic.setProperties(GSON.fromJson(propString, JsonObject.class));
}
+ processTraffic.setLabelsJson((String) converter.get(LABELS_JSON));
processTraffic.setLastPingTimestamp(((Number) converter.get(LAST_PING_TIME_BUCKET)).longValue());
processTraffic.setDetectType(((Number) converter.get(DETECT_TYPE)).intValue());
processTraffic.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
@@ -198,6 +210,7 @@ public class ProcessTraffic extends Metrics {
} else {
converter.accept(PROPERTIES, Const.EMPTY_STRING);
}
+ converter.accept(LABELS_JSON, storageData.getLabelsJson());
converter.accept(LAST_PING_TIME_BUCKET, storageData.getLastPingTimestamp());
converter.accept(DETECT_TYPE, storageData.getDetectType());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ServiceLabelRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ServiceLabelRecord.java
new file mode 100644
index 0000000000..c7fb8f21c1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ServiceLabelRecord.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.process;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_LABEL;
+
+/**
+ * Process have multiple labels, such as tag.
+ * {@link ServiceLabelRecord} could combine them in the service level.
+ * It could help to quickly locate the similar process by the service and label.
+ */
+@Setter
+@Getter
+@Stream(name = ServiceLabelRecord.INDEX_NAME, scopeId = SERVICE_LABEL,
+ builder = ServiceLabelRecord.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = false)
+@EqualsAndHashCode(of = {
+ "serviceId",
+ "label"
+})
+public class ServiceLabelRecord extends Metrics {
+
+ public static final String INDEX_NAME = "service_label";
+ public static final String SERVICE_ID = "service_id";
+ public static final String LABEL = "label";
+
+ @Column(columnName = SERVICE_ID)
+ private String serviceId;
+ @Column(columnName = LABEL, length = 50)
+ private String label;
+
+ @Override
+ public boolean combine(Metrics metrics) {
+ return true;
+ }
+
+ @Override
+ public void calculate() {
+ }
+
+ @Override
+ public Metrics toHour() {
+ return null;
+ }
+
+ @Override
+ public Metrics toDay() {
+ return null;
+ }
+
+ @Override
+ protected String id0() {
+ return this.serviceId + Const.ID_CONNECTOR + new String(Base64.getEncoder()
+ .encode(label.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void deserialize(RemoteData remoteData) {
+ setServiceId(remoteData.getDataStrings(0));
+ setLabel(remoteData.getDataStrings(1));
+ setTimeBucket(remoteData.getDataLongs(0));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataStrings(serviceId);
+ builder.addDataStrings(label);
+ builder.addDataLongs(getTimeBucket());
+ return builder;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return this.hashCode();
+ }
+
+ public static class Builder implements StorageBuilder<ServiceLabelRecord> {
+
+ @Override
+ public ServiceLabelRecord storage2Entity(Convert2Entity converter) {
+ final ServiceLabelRecord record = new ServiceLabelRecord();
+ record.setServiceId((String) converter.get(SERVICE_ID));
+ record.setLabel((String) converter.get(LABEL));
+ record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ return record;
+ }
+
+ @Override
+ public void entity2Storage(ServiceLabelRecord entity, Convert2Storage converter) {
+ converter.accept(SERVICE_ID, entity.getServiceId());
+ converter.accept(LABEL, entity.getLabel());
+ converter.accept(TIME_BUCKET, entity.getTimeBucket());
+ }
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
index b754050371..d2a28f5c4d 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
@@ -49,13 +49,13 @@ public class CommandService implements Service {
/**
* Used to notify the eBPF Profiling task to the eBPF agent side
*/
- public EBPFProfilingTaskCommand newEBPFProfilingTaskCommand(EBPFProfilingTask task) {
+ public EBPFProfilingTaskCommand newEBPFProfilingTaskCommand(EBPFProfilingTask task, String processId) {
final String serialNumber = UUID.randomUUID().toString();
EBPFProfilingTaskCommand.FixedTrigger fixedTrigger = null;
if (Objects.equals(task.getTriggerType(), EBPFProfilingTriggerType.FIXED_TIME)) {
fixedTrigger = new EBPFProfilingTaskCommand.FixedTrigger(task.getFixedTriggerDuration());
}
- return new EBPFProfilingTaskCommand(serialNumber, task.getTaskId(), task.getProcessId(), task.getCreateTime(),
+ return new EBPFProfilingTaskCommand(serialNumber, task.getTaskId(), processId, task.getCreateTime(),
task.getLastUpdateTime(), task.getTriggerType().name(), fixedTrigger, task.getTargetType().name());
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingMutationService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingMutationService.java
index 14c62991e1..2f7430b159 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingMutationService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingMutationService.java
@@ -18,37 +18,39 @@
package org.apache.skywalking.oap.server.core.profiling.ebpf;
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.query.input.EBPFProfilingTaskFixedTimeCreationRequest;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskCreationResult;
-import org.apache.skywalking.oap.server.core.query.type.Process;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class EBPFProfilingMutationService implements Service {
+ private static final Gson GSON = new Gson();
public static final int FIXED_TIME_MIN_DURATION = (int) TimeUnit.SECONDS.toSeconds(60);
private final ModuleManager moduleManager;
private IEBPFProfilingTaskDAO processProfilingTaskDAO;
- private IMetadataQueryDAO metadataQueryDAO;
+ private IServiceLabelDAO serviceLabelDAO;
private IEBPFProfilingTaskDAO getProcessProfilingTaskDAO() {
if (processProfilingTaskDAO == null) {
@@ -59,13 +61,13 @@ public class EBPFProfilingMutationService implements Service {
return processProfilingTaskDAO;
}
- private IMetadataQueryDAO getMetadataQueryDAO() {
- if (metadataQueryDAO == null) {
- this.metadataQueryDAO = moduleManager.find(StorageModule.NAME)
+ public IServiceLabelDAO getServiceLabelDAO() {
+ if (serviceLabelDAO == null) {
+ this.serviceLabelDAO = moduleManager.find(StorageModule.NAME)
.provider()
- .getService(IMetadataQueryDAO.class);
+ .getService(IServiceLabelDAO.class);
}
- return metadataQueryDAO;
+ return serviceLabelDAO;
}
/**
@@ -85,16 +87,11 @@ public class EBPFProfilingMutationService implements Service {
// create task
final EBPFProfilingTaskRecord task = new EBPFProfilingTaskRecord();
- task.setProcessFindType(request.getProcessFinder().getFinderType().value());
- if (request.getProcessFinder().getFinderType() == EBPFProfilingProcessFinderType.PROCESS_ID) {
- final Process process = getMetadataQueryDAO().getProcess(request.getProcessFinder().getProcessId());
- if (process == null) {
- return buildError("could not found process");
- }
- task.setServiceId(process.getServiceId());
- task.setInstanceId(process.getInstanceId());
- task.setProcessId(process.getId());
- task.setProcessName(process.getName());
+ task.setServiceId(request.getServiceId());
+ if (CollectionUtils.isNotEmpty(request.getProcessLabels())) {
+ task.setProcessLabelsJson(GSON.toJson(request.getProcessLabels()));
+ } else {
+ task.setProcessLabelsJson(Const.EMPTY_STRING);
}
task.setStartTime(request.getStartTime());
task.setTriggerType(EBPFProfilingTriggerType.FIXED_TIME.value());
@@ -113,18 +110,28 @@ public class EBPFProfilingMutationService implements Service {
}
private String checkCreateRequest(EBPFProfilingTaskFixedTimeCreationRequest request) throws IOException {
- String err = "";
-
- // validate process finder
- if (request.getProcessFinder() == null) {
- return "The process finder could not be null";
- }
- if (request.getProcessFinder().getFinderType() == null) {
- return "The process find type could not be null";
- }
- switch (request.getProcessFinder().getFinderType()) {
- case PROCESS_ID:
- err = requiredNotEmpty(err, "process id", request.getProcessFinder().getProcessId());
+ String err = null;
+
+ err = requiredNotEmpty(err, "service", request.getServiceId());
+
+ // the request label must be legal
+ if (err == null && CollectionUtils.isNotEmpty(request.getProcessLabels())) {
+ final List<String> existingLabels = getServiceLabelDAO().queryAllLabels(request.getServiceId());
+ List<String> notExistLabels = new ArrayList<>(existingLabels.size());
+ for (String processLabel : request.getProcessLabels()) {
+ if (!existingLabels.contains(processLabel)) {
+ notExistLabels.add(processLabel);
+ }
+ }
+ if (notExistLabels.size() > 0) {
+ err = String.format("The service doesn't have processes with label(s) %s.", Joiner.on(", ").join(notExistLabels));
+ } else {
+ final String labelJson = GSON.toJson(request.getProcessLabels());
+ if (labelJson.length() > EBPFProfilingTaskRecord.PROCESS_LABELS_JSON_MAX_LENGTH) {
+ err = String.format("The labels length is bigger than %d, please reduce the labels count",
+ EBPFProfilingTaskRecord.PROCESS_LABELS_JSON_MAX_LENGTH);
+ }
+ }
}
if (err != null) {
return err;
@@ -143,10 +150,7 @@ public class EBPFProfilingMutationService implements Service {
// query exist processes
final List<EBPFProfilingTask> tasks = getProcessProfilingTaskDAO().queryTasks(
- EBPFProfilingProcessFinder.builder()
- .finderType(request.getProcessFinder().getFinderType())
- .processIdList(Arrays.asList(request.getProcessFinder().getProcessId()))
- .build(), request.getTargetType(), calculateStartTime(request), 0);
+ Arrays.asList(request.getServiceId()), request.getTargetType(), calculateStartTime(request), 0);
if (CollectionUtils.isNotEmpty(tasks)) {
return "already have profiling task at this time";
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
index cf39cbd280..05008f6005 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
@@ -18,49 +18,61 @@
package org.apache.skywalking.oap.server.core.profiling.ebpf;
+import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.profiling.ebpf.analyze.EBPFProfilingAnalyzer;
import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.input.EBPFProfilingTaskCondition;
import org.apache.skywalking.oap.server.core.query.type.Attribute;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskPrepare;
import org.apache.skywalking.oap.server.core.query.type.Process;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class EBPFProfilingQueryService implements Service {
+ private static final Gson GSON = new Gson();
+
private final ModuleManager moduleManager;
private final CoreModuleConfig config;
private final StorageModels storageModels;
+ private IMetadataQueryDAO metadataQueryDAO;
+ private IServiceLabelDAO serviceLabelDAO;
private IEBPFProfilingTaskDAO taskDAO;
private IEBPFProfilingScheduleDAO scheduleDAO;
private EBPFProfilingAnalyzer profilingAnalyzer;
@@ -118,13 +130,49 @@ public class EBPFProfilingQueryService implements Service {
return profilingAnalyzer;
}
- public List<EBPFProfilingTask> queryEBPFProfilingTasks(EBPFProfilingTaskCondition condition) throws IOException {
- return getTaskDAO().queryTasks(EBPFProfilingProcessFinder.builder()
- .finderType(condition.getFinderType())
- .serviceId(condition.getServiceId())
- .instanceId(condition.getInstanceId())
- .processIdList(Arrays.asList(condition.getProcessId()))
- .build(), null, 0, 0);
+ public IMetadataQueryDAO getMetadataQueryDAO() {
+ if (metadataQueryDAO == null) {
+ metadataQueryDAO = moduleManager.find(StorageModule.NAME)
+ .provider()
+ .getService(IMetadataQueryDAO.class);
+ }
+ return metadataQueryDAO;
+ }
+
+ public IServiceLabelDAO getServiceLabelDAO() {
+ if (serviceLabelDAO == null) {
+ serviceLabelDAO = moduleManager.find(StorageModule.NAME)
+ .provider()
+ .getService(IServiceLabelDAO.class);
+ }
+ return serviceLabelDAO;
+ }
+
+ public EBPFProfilingTaskPrepare queryPrepareCreateEBPFProfilingTaskData(String serviceId) throws IOException {
+ final EBPFProfilingTaskPrepare prepare = new EBPFProfilingTaskPrepare();
+ // query process count in last 10 minutes
+ final long endTimestamp = System.currentTimeMillis();
+ final long startTimestamp = endTimestamp - TimeUnit.MINUTES.toMillis(10);
+ final long processesCount = getMetadataQueryDAO().getProcessesCount(serviceId, null, null,
+ TimeBucket.getTimeBucket(startTimestamp, DownSampling.Minute),
+ TimeBucket.getTimeBucket(endTimestamp, DownSampling.Minute));
+ if (processesCount <= 0) {
+ prepare.setCouldProfiling(false);
+ prepare.setProcessLabels(Collections.emptyList());
+ return prepare;
+ }
+ prepare.setCouldProfiling(true);
+ final List<String> processLabels = getServiceLabelDAO().queryAllLabels(serviceId);
+ if (processLabels != null && !processLabels.isEmpty()) {
+ prepare.setProcessLabels(processLabels.stream().distinct().collect(Collectors.toList()));
+ } else {
+ prepare.setProcessLabels(Collections.emptyList());
+ }
+ return prepare;
+ }
+
+ public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId) throws IOException {
+ return getTaskDAO().queryTasks(Arrays.asList(serviceId), null, 0, 0);
}
public List<EBPFProfilingSchedule> queryEBPFProfilingSchedules(String taskId, Duration duration) throws IOException {
@@ -147,8 +195,8 @@ public class EBPFProfilingQueryService implements Service {
return schedules;
}
- public EBPFProfilingAnalyzation getEBPFProfilingAnalyzation(String taskId, List<EBPFProfilingAnalyzeTimeRange> timeRanges) throws IOException {
- return getProfilingAnalyzer().analyze(taskId, timeRanges);
+ public EBPFProfilingAnalyzation getEBPFProfilingAnalyzation(List<String> scheduleIdList, List<EBPFProfilingAnalyzeTimeRange> timeRanges) throws IOException {
+ return getProfilingAnalyzer().analyze(scheduleIdList, timeRanges);
}
private Process convertProcess(ProcessTraffic traffic) {
@@ -169,6 +217,9 @@ public class EBPFProfilingQueryService implements Service {
process.getAttributes().add(new Attribute(key, traffic.getProperties().get(key).getAsString()));
}
}
+ if (StringUtil.isNotEmpty(traffic.getLabelsJson())) {
+ process.getLabels().addAll(GSON.<List<String>>fromJson(traffic.getLabelsJson(), ArrayList.class));
+ }
return process;
}
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
index ee3359ac29..41f11635ab 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
@@ -69,7 +69,7 @@ public class EBPFProfilingAnalyzer {
/**
* search data and analyze
*/
- public EBPFProfilingAnalyzation analyze(String taskId, List<EBPFProfilingAnalyzeTimeRange> ranges) throws IOException {
+ public EBPFProfilingAnalyzation analyze(List<String> scheduleIdList, List<EBPFProfilingAnalyzeTimeRange> ranges) throws IOException {
EBPFProfilingAnalyzation analyzation = new EBPFProfilingAnalyzation();
String timeRangeValidate = validateIsOutOfTimeRangeLimit(ranges);
@@ -82,7 +82,7 @@ public class EBPFProfilingAnalyzer {
long queryDataMaxTimestamp = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxQueryTimeoutInSecond);
final Stream<EBPFProfilingStack> stackStream = buildTimeRanges(ranges).parallelStream().map(r -> {
try {
- return fetchDataThreadPool.submit(() -> getDataDAO().queryData(taskId, r.getMinTime(), r.getMaxTime()))
+ return fetchDataThreadPool.submit(() -> getDataDAO().queryData(scheduleIdList, r.getMinTime(), r.getMaxTime()))
.get(queryDataMaxTimestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn(e.getMessage(), e);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
index c7a79cd84c..fc55ab850f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
@@ -24,7 +24,6 @@ import lombok.Data;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
@@ -41,13 +40,9 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EB
@Stream(name = EBPFProfilingTaskRecord.INDEX_NAME, scopeId = EBPF_PROFILING_TASK,
builder = EBPFProfilingTaskRecord.Builder.class, processor = NoneStreamProcessor.class)
public class EBPFProfilingTaskRecord extends NoneStream {
-
public static final String INDEX_NAME = "ebpf_profiling_task";
- public static final String PROCESS_FIND_TYPE = "process_find_type";
public static final String SERVICE_ID = "service_id";
- public static final String INSTANCE_ID = "instance_id";
- public static final String PROCESS_ID = "process_id";
- public static final String PROCESS_NAME = "process_name";
+ public static final String PROCESS_LABELS_JSON = "process_labels_json";
public static final String START_TIME = "start_time";
public static final String TRIGGER_TYPE = "trigger_type";
public static final String FIXED_TRIGGER_DURATION = "fixed_trigger_duration";
@@ -55,16 +50,12 @@ public class EBPFProfilingTaskRecord extends NoneStream {
public static final String CREATE_TIME = "create_time";
public static final String LAST_UPDATE_TIME = "last_update_time";
- @Column(columnName = PROCESS_FIND_TYPE)
- private int processFindType = EBPFProfilingProcessFinderType.UNKNOWN.value();
+ public static final int PROCESS_LABELS_JSON_MAX_LENGTH = 1000;
+
@Column(columnName = SERVICE_ID)
private String serviceId;
- @Column(columnName = INSTANCE_ID, length = 600)
- private String instanceId;
- @Column(columnName = PROCESS_ID, length = 600)
- private String processId;
- @Column(columnName = PROCESS_NAME, length = 500)
- private String processName;
+ @Column(columnName = PROCESS_LABELS_JSON, length = PROCESS_LABELS_JSON_MAX_LENGTH)
+ private String processLabelsJson;
@Column(columnName = START_TIME)
private long startTime;
@Column(columnName = TRIGGER_TYPE)
@@ -81,9 +72,9 @@ public class EBPFProfilingTaskRecord extends NoneStream {
@Override
public String id() {
return Hashing.sha256().newHasher()
- .putString(processId, Charsets.UTF_8)
+ .putString(serviceId, Charsets.UTF_8)
+ .putString(processLabelsJson, Charsets.UTF_8)
.putLong(createTime)
- .putInt(processFindType)
.hash().toString();
}
@@ -92,11 +83,8 @@ public class EBPFProfilingTaskRecord extends NoneStream {
@Override
public EBPFProfilingTaskRecord storage2Entity(final Convert2Entity converter) {
final EBPFProfilingTaskRecord record = new EBPFProfilingTaskRecord();
- record.setProcessFindType(((Number) converter.get(PROCESS_FIND_TYPE)).intValue());
record.setServiceId((String) converter.get(SERVICE_ID));
- record.setInstanceId((String) converter.get(INSTANCE_ID));
- record.setProcessId((String) converter.get(PROCESS_ID));
- record.setProcessName((String) converter.get(PROCESS_NAME));
+ record.setProcessLabelsJson((String) converter.get(PROCESS_LABELS_JSON));
record.setTriggerType(((Number) converter.get(TRIGGER_TYPE)).intValue());
record.setStartTime(((Number) converter.get(START_TIME)).longValue());
record.setFixedTriggerDuration(((Number) converter.get(FIXED_TRIGGER_DURATION)).longValue());
@@ -109,11 +97,8 @@ public class EBPFProfilingTaskRecord extends NoneStream {
@Override
public void entity2Storage(final EBPFProfilingTaskRecord storageData, final Convert2Storage converter) {
- converter.accept(PROCESS_FIND_TYPE, storageData.getProcessFindType());
converter.accept(SERVICE_ID, storageData.getServiceId());
- converter.accept(INSTANCE_ID, storageData.getInstanceId());
- converter.accept(PROCESS_ID, storageData.getProcessId());
- converter.accept(PROCESS_NAME, storageData.getProcessName());
+ converter.accept(PROCESS_LABELS_JSON, storageData.getProcessLabelsJson());
converter.accept(TRIGGER_TYPE, storageData.getTriggerType());
converter.accept(START_TIME, storageData.getStartTime());
converter.accept(FIXED_TRIGGER_DURATION, storageData.getFixedTriggerDuration());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
index 5cd017a9ab..50d95cef83 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
@@ -20,16 +20,19 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Endpoint;
import org.apache.skywalking.oap.server.core.query.type.EndpointInfo;
import org.apache.skywalking.oap.server.core.query.type.Process;
@@ -38,6 +41,7 @@ import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
public class MetadataQueryService implements org.apache.skywalking.oap.server.library.module.Service {
@@ -98,11 +102,9 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return endpointInfo;
}
- public List<Process> listProcesses(final String serviceId, final String instanceId) throws IOException {
- if (StringUtils.isEmpty(serviceId) && StringUtils.isEmpty(instanceId)) {
- return Collections.emptyList();
- }
- return getMetadataQueryDAO().listProcesses(serviceId, instanceId, null);
+ public List<Process> listProcesses(final Duration duration, final String instanceId) throws IOException {
+ return getMetadataQueryDAO().listProcesses(null, instanceId, null,
+ duration.getStartTimeBucket(), duration.getEndTimeBucket());
}
public Process getProcess(String processId) throws IOException {
@@ -112,6 +114,19 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return getMetadataQueryDAO().getProcess(processId);
}
+ public Long estimateProcessScale(String serviceId, List<String> labels) throws IOException {
+ if (StringUtils.isEmpty(serviceId)) {
+ return 0L;
+ }
+ final long endTimestamp = System.currentTimeMillis();
+ final long startTimestamp = endTimestamp - TimeUnit.MINUTES.toMillis(10);
+ final List<Process> processes = getMetadataQueryDAO().listProcesses(serviceId, null, null,
+ TimeBucket.getTimeBucket(startTimestamp, DownSampling.Minute), TimeBucket.getTimeBucket(endTimestamp, DownSampling.Minute));
+ return CollectionUtils.isEmpty(processes) ?
+ 0L :
+ processes.stream().filter(p -> p.getLabels().containsAll(labels)).count();
+ }
+
private List<Service> combineServices(List<Service> services) {
return new ArrayList<>(services.stream()
.peek(service -> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskCondition.java
deleted file mode 100644
index 9aa390b8f3..0000000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskCondition.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.query.input;
-
-import lombok.Data;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
-
-/**
- * eBPF profiling task query condition
- */
-@Data
-public class EBPFProfilingTaskCondition {
- // the process finder type of profiling task
- private EBPFProfilingProcessFinderType finderType;
- // service ID of process which need profiling
- private String serviceId;
- // instance ID of process which need profiling
- private String instanceId;
- // process ID of process which need profiling
- private String processId;
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskFixedTimeCreationRequest.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskFixedTimeCreationRequest.java
index 34eec73ff1..9bac5504da 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskFixedTimeCreationRequest.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingTaskFixedTimeCreationRequest.java
@@ -23,12 +23,16 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
+import java.util.List;
+
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EBPFProfilingTaskFixedTimeCreationRequest {
- // Define how to find the process
- private EBPFProfilingProcessFinder processFinder;
+ // Define which processes under the service need to be profiling
+ private String serviceId;
+ // Aggregate which processes need to be profiling from labels
+ private List<String> processLabels;
// The task start timestamp(ms), if less than or equal zero means the task starts ASAP
private long startTime;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingProcessFinderType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingProcessFinderType.java
deleted file mode 100644
index 1791394266..0000000000
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingProcessFinderType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.query.type;
-
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * The eBPF Process Profiling finder type define how to find the process
- */
-public enum EBPFProfilingProcessFinderType {
-
- UNKNOWN(0),
-
- /**
- * Find process by id
- */
- PROCESS_ID(1)
- ;
-
- private final int value;
- private static final Map<Integer, EBPFProfilingProcessFinderType> DICTIONARY = new HashMap<>();
-
- static {
- Arrays.stream(EBPFProfilingProcessFinderType.values()).collect(Collectors.toMap(EBPFProfilingProcessFinderType::value, type -> type)).forEach(DICTIONARY::put);
- }
-
- EBPFProfilingProcessFinderType(int value) {
- this.value = value;
- }
-
- public int value() {
- return value;
- }
-
- public static EBPFProfilingProcessFinderType valueOf(int value) {
- EBPFProfilingProcessFinderType type = DICTIONARY.get(value);
- if (type == null) {
- throw new UnexpectedException("Unknown EBPFProcessProfilingFinderType value");
- }
- return type;
- }
-}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTask.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTask.java
index 369a978dd2..0b3433af29 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTask.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTask.java
@@ -22,17 +22,15 @@ import lombok.Data;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
+import java.util.List;
+
@Data
public class EBPFProfilingTask {
private String taskId;
- private EBPFProfilingProcessFinderType processFinderType;
private String serviceId;
private String serviceName;
- private String instanceId;
- private String instanceName;
- private String processId;
- private String processName;
+ private List<String> processLabels;
private long taskStartTime;
private EBPFProfilingTriggerType triggerType;
private long fixedTriggerDuration;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingProcessFinder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTaskPrepare.java
similarity index 65%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingProcessFinder.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTaskPrepare.java
index c9e03db91b..09951ae1cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/EBPFProfilingProcessFinder.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/EBPFProfilingTaskPrepare.java
@@ -16,16 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.core.query.input;
+package org.apache.skywalking.oap.server.core.query.type;
import lombok.Data;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
-// Define how to find process which need to be profiling
+import java.util.List;
+
+/**
+ * eBPF Profiling prepare to create task needs data
+ */
@Data
-public class EBPFProfilingProcessFinder {
- // the way to address the target process
- private EBPFProfilingProcessFinderType finderType;
- // appoint process ID when use the PROCESS_ID finder type
- private String processId;
+public class EBPFProfilingTaskPrepare {
+ private boolean couldProfiling;
+ private List<String> processLabels;
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Process.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Process.java
index ac6af2833f..12a11c56d7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Process.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Process.java
@@ -47,8 +47,10 @@ public class Process {
@Setter
private String detectType;
private final List<Attribute> attributes;
+ private final List<String> labels;
public Process() {
this.attributes = new ArrayList<>();
+ this.labels = new ArrayList<>();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java
index 51391c8a0c..e279de1f9a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Data;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
@@ -47,6 +48,8 @@ public class Event {
private long endTime;
+ private String layer = Layer.UNDEFINED.name();
+
public void setParameters(final List<KeyValue> parameters) {
this.parameters = parameters;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/EventQueryCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/EventQueryCondition.java
index be886a7fc1..33327fb6d9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/EventQueryCondition.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/EventQueryCondition.java
@@ -43,5 +43,7 @@ public class EventQueryCondition {
private Order order;
+ private String layer;
+
private Pagination paging;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 80dbd38f07..d3cc706128 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -100,6 +100,7 @@ public class DefaultScopeDefine {
public static final int EBPF_PROFILING_TASK = 46;
public static final int EBPF_PROFILING_SCHEDULE = 47;
public static final int EBPF_PROFILING_DATA = 48;
+ public static final int SERVICE_LABEL = 49;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java
index e0513fdc09..4d4224deb0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java
@@ -25,6 +25,8 @@ import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
+import java.util.List;
+
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS;
@ScopeDeclaration(id = PROCESS, name = "Process")
@@ -73,6 +75,9 @@ public class Process extends Source {
@Getter
@Setter
private JsonObject properties;
+ @Setter
+ @Getter
+ private List<String> labels;
@Override
public void prepare() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceLabel.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceLabel.java
index e0513fdc09..e6971c63c7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceLabel.java
@@ -18,65 +18,49 @@
package org.apache.skywalking.oap.server.core.source;
-import com.google.gson.JsonObject;
import lombok.Getter;
import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
-import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
-@ScopeDeclaration(id = PROCESS, name = "Process")
+import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_LABEL;
+
+@ScopeDeclaration(id = SERVICE_LABEL, name = "ServiceLabel")
@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
-public class Process extends Source {
+public class ServiceLabel extends Source {
private volatile String entityId;
@Override
public int scope() {
- return PROCESS;
+ return SERVICE_LABEL;
}
@Override
public String getEntityId() {
if (entityId == null) {
- entityId = IDManager.ProcessID.buildId(instanceId, name);
+ entityId = serviceId + Const.ID_CONNECTOR + new String(Base64.getEncoder()
+ .encode(label.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8);
}
return entityId;
}
- @Getter
- private String instanceId;
@Getter
private String serviceId;
- @Getter
- @Setter
- private String name;
- @Getter
- @Setter
- private String serviceName;
- @Getter
- @Setter
- private String instanceName;
- @Getter
@Setter
- private Layer layer;
@Getter
- @Setter
private boolean isServiceNormal;
- @Getter
@Setter
- private String agentId;
@Getter
+ private String serviceName;
@Setter
- private ProcessDetectType detectType;
@Getter
- @Setter
- private JsonObject properties;
+ private String label;
@Override
public void prepare() {
serviceId = IDManager.ServiceID.buildId(serviceName, isServiceNormal);
- instanceId = IDManager.ServiceInstanceID.buildId(serviceId, instanceName);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 19a2dc36bf..8a27326f2a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -74,7 +75,8 @@ public class StorageModule extends ModuleDefine {
IEventQueryDAO.class,
IEBPFProfilingTaskDAO.class,
IEBPFProfilingScheduleDAO.class,
- IEBPFProfilingDataDAO.class
+ IEBPFProfilingDataDAO.class,
+ IServiceLabelDAO.class
};
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingDataDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingDataDAO.java
index 2d2055b50d..a6f3d77558 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingDataDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingDataDAO.java
@@ -30,9 +30,9 @@ import java.util.List;
public interface IEBPFProfilingDataDAO extends DAO {
/**
* list profiling data by task and time
- * @param taskId profiling task id
+ * @param scheduleIdList profiling schedule ID list
* @param beginTime timestamp bigger than or equals
* @param endTime timestamp smaller than
*/
- List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime) throws IOException;
+ List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException;
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingTaskDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingTaskDAO.java
index 2beb22b06f..1f96b5cb71 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingTaskDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IEBPFProfilingTaskDAO.java
@@ -32,12 +32,12 @@ public interface IEBPFProfilingTaskDAO extends DAO {
/**
* list profiling tasks
- * @param finder process finder config
+ * @param serviceIdList which service is belong
* @param targetType profiling task target type
* @param taskStartTime profiling task start timestamp, bigger than or equals
* @param latestUpdateTime profiling task last update timestamp, bigger than
*/
- List<EBPFProfilingTask> queryTasks(EBPFProfilingProcessFinder finder,
+ List<EBPFProfilingTask> queryTasks(List<String> serviceIdList,
EBPFProfilingTargetType targetType,
long taskStartTime, long latestUpdateTime) throws IOException;
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/EBPFProfilingProcessFinder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IServiceLabelDAO.java
similarity index 66%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/EBPFProfilingProcessFinder.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IServiceLabelDAO.java
index a668f195d6..4f36b73e21 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/EBPFProfilingProcessFinder.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/ebpf/IServiceLabelDAO.java
@@ -18,21 +18,18 @@
package org.apache.skywalking.oap.server.core.storage.profiling.ebpf;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
+import org.apache.skywalking.oap.server.core.storage.DAO;
+import java.io.IOException;
import java.util.List;
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Builder
-public class EBPFProfilingProcessFinder {
- private EBPFProfilingProcessFinderType finderType;
- private String serviceId;
- private String instanceId;
- private List<String> processIdList;
+/**
+ * Process Service Label Query
+ */
+public interface IServiceLabelDAO extends DAO {
+
+ /**
+ * Query all labels from service
+ */
+ List<String> queryAllLabels(String serviceId) throws IOException;
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
index 5bc8e8d882..c9f0a8eac3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
@@ -64,7 +64,18 @@ public interface IMetadataQueryDAO extends DAO {
* @param agentId the agent id which reports the process.
* @return list of processes matching the given conditions.
*/
- List<Process> listProcesses(final String serviceId, final String instanceId, final String agentId) throws IOException;
+ List<Process> listProcesses(final String serviceId, final String instanceId, final String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException;
+
+ /**
+ * get the count of processes
+ * @param serviceId the service of the processes.
+ * @param instanceId the service instance of the process.
+ * @param agentId the agent id which reports the process.
+ * @return the size of processes
+ */
+ long getProcessesCount(final String serviceId, final String instanceId, final String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException;
/**
* @param processId the id of the process.
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzeContext.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzeContext.java
index da28c130ca..fbdc2ee4a5 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzeContext.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzeContext.java
@@ -118,7 +118,7 @@ public class EBPFProfilingAnalyzeContext {
}
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String> taskIdList, long beginTime, long endTime) throws IOException {
final ArrayList<EBPFProfilingDataRecord> records = new ArrayList<>();
for (; beginTime < endTime; beginTime++) {
if (symbols.size() <= (int) beginTime) {
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
index fa89b82bb7..7c7e26869d 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
@@ -22,11 +22,11 @@ import graphql.kickstart.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingQueryService;
import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.input.EBPFProfilingTaskCondition;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskPrepare;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@@ -51,19 +51,25 @@ public class EBPFProcessProfilingQuery implements GraphQLQueryResolver {
return queryService;
}
- public List<EBPFProfilingTask> queryEBPFProfilingTasks(EBPFProfilingTaskCondition query) throws IOException {
- if (query == null || (StringUtil.isEmpty(query.getServiceId()) && StringUtil.isEmpty(query.getInstanceId())
- && StringUtil.isEmpty(query.getProcessId()))) {
- throw new IllegalArgumentException("please provide the task condition");
+ public EBPFProfilingTaskPrepare queryPrepareCreateEBPFProfilingTaskData(String serviceId) throws IOException {
+ if (StringUtil.isEmpty(serviceId)) {
+ throw new IllegalArgumentException("please provide the service id");
}
- return getQueryService().queryEBPFProfilingTasks(query);
+ return getQueryService().queryPrepareCreateEBPFProfilingTaskData(serviceId);
+ }
+
+ public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId) throws IOException {
+ if (StringUtil.isEmpty(serviceId)) {
+ throw new IllegalArgumentException("please provide the service id");
+ }
+ return getQueryService().queryEBPFProfilingTasks(serviceId);
}
public List<EBPFProfilingSchedule> queryEBPFProfilingSchedules(String taskId, Duration duration) throws IOException {
return getQueryService().queryEBPFProfilingSchedules(taskId, duration);
}
- public EBPFProfilingAnalyzation getEBPFProfilingAnalyzation(String taskId, List<EBPFProfilingAnalyzeTimeRange> timeRanges) throws IOException {
- return getQueryService().getEBPFProfilingAnalyzation(taskId, timeRanges);
+ public EBPFProfilingAnalyzation analysisEBPFProfilingResult(List<String> scheduleIdList, List<EBPFProfilingAnalyzeTimeRange> timeRanges) throws IOException {
+ return getQueryService().getEBPFProfilingAnalyzation(scheduleIdList, timeRanges);
}
}
\ No newline at end of file
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQueryV2.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQueryV2.java
index fc6d0b820e..6151c184d0 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQueryV2.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQueryV2.java
@@ -94,14 +94,18 @@ public class MetadataQueryV2 implements GraphQLQueryResolver {
return getMetadataQueryService().getEndpointInfo(endpointId);
}
- public List<Process> listProcesses(final String serviceId, final String instanceId) throws IOException {
- return getMetadataQueryService().listProcesses(serviceId, instanceId);
+ public List<Process> listProcesses(final Duration duration, final String instanceId) throws IOException {
+ return getMetadataQueryService().listProcesses(duration, instanceId);
}
public Process getProcess(final String processId) throws IOException {
return getMetadataQueryService().getProcess(processId);
}
+ public Long estimateProcessScale(String serviceId, List<String> labels) throws IOException {
+ return getMetadataQueryService().estimateProcessScale(serviceId, labels);
+ }
+
public TimeInfo getTimeInfo() {
TimeInfo timeInfo = new TimeInfo();
SimpleDateFormat timezoneFormat = new SimpleDateFormat("ZZZZZZ");
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 6734397802..4fb447b78d 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 673439780259838a55a5c1490029eda84556eeee
+Subproject commit 4fb447b78d6e31912cbd8c8ae82cca1fc80861de
diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java
index 7007cb392f..526d1d06e4 100644
--- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java
@@ -41,13 +41,16 @@ import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDete
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.Process;
+import org.apache.skywalking.oap.server.core.source.ServiceLabel;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.ArrayList;
+import java.util.List;
public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProcessServiceImplBase implements GRPCHandler {
@@ -83,6 +86,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
processes.stream().forEach(e -> {
sourceReceiver.receive(e._1);
builder.addProcesses(e._2);
+ handleServiceLabels(e._1.getServiceName(), e._1.isServiceNormal(), e._1.getLabels(), e._1.getTimeBucket());
});
responseObserver.onNext(builder.build());
@@ -106,6 +110,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
processUpdate.setLayer(layer);
processUpdate.setServiceNormal(true);
processUpdate.setName(entity.getProcessName());
+ processUpdate.setLabels(entity.getLabelsList());
processUpdate.setTimeBucket(timeBucket);
sourceReceiver.receive(processUpdate);
@@ -123,6 +128,9 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
serviceMeta.setTimeBucket(timeBucket);
serviceMeta.setLayer(layer);
sourceReceiver.receive(serviceMeta);
+
+ // service label
+ handleServiceLabels(serviceName, true, processUpdate.getLabels(), timeBucket);
});
responseObserver.onNext(Commands.newBuilder().build());
@@ -147,6 +155,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
properties.addProperty(ProcessTraffic.PropertyUtil.PID, hostProcess.getPid());
properties.addProperty(ProcessTraffic.PropertyUtil.COMMAND_LINE, hostProcess.getCmd());
process.setProperties(properties);
+ process.setLabels(hostProcess.getEntity().getLabelsList());
// timestamp
process.setTimeBucket(
@@ -162,4 +171,22 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
.build();
return Tuple.of(process, downstream);
}
+
+ /**
+ * Append service label
+ */
+ private void handleServiceLabels(String serviceName, boolean isServiceNormal, List<String> labels, long timeBucket) {
+ if (CollectionUtils.isEmpty(labels)) {
+ return;
+ }
+ for (String label : labels) {
+ final ServiceLabel serviceLabel = new ServiceLabel();
+ serviceLabel.setServiceName(serviceName);
+ serviceLabel.setServiceNormal(isServiceNormal);
+ serviceLabel.setLabel(label);
+ serviceLabel.setTimeBucket(timeBucket);
+
+ sourceReceiver.receive(serviceLabel);
+ }
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
index cd125c1ea0..18dd278155 100644
--- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
@@ -36,12 +36,12 @@ import org.apache.skywalking.oap.server.core.source.EBPFProfilingData;
import org.apache.skywalking.oap.server.core.source.EBPFProcessProfilingSchedule;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.network.trace.component.command.EBPFProfilingTaskCommand;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -77,7 +78,7 @@ public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFPr
final long latestUpdateTime = request.getLatestUpdateTime();
try {
// find exists process from agent
- final List<Process> processes = metadataQueryDAO.listProcesses(null, null, agentId);
+ final List<Process> processes = metadataQueryDAO.listProcesses(null, null, agentId, 0, 0);
if (CollectionUtils.isEmpty(processes)) {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
@@ -85,11 +86,12 @@ public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFPr
}
// fetch tasks from process id list
- final EBPFProfilingProcessFinder finder = EBPFProfilingProcessFinder.builder()
- .processIdList(processes.stream().map(Process::getId).collect(Collectors.toList())).build();
- final List<EBPFProfilingTask> tasks = taskDAO.queryTasks(finder, null, 0, latestUpdateTime);
+ final List<String> serviceIdList = processes.stream().map(Process::getServiceId).distinct().collect(Collectors.toList());
+ final List<EBPFProfilingTask> tasks = taskDAO.queryTasks(serviceIdList, null, 0, latestUpdateTime);
+
final Commands.Builder builder = Commands.newBuilder();
- tasks.stream().map(t -> commandService.newEBPFProfilingTaskCommand(t).serialize()).forEach(builder::addCommands);
+ tasks.stream().flatMap(t -> this.buildProfilingCommands(t, processes).stream())
+ .map(EBPFProfilingTaskCommand::serialize).forEach(builder::addCommands);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
return;
@@ -100,6 +102,23 @@ public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFPr
responseObserver.onCompleted();
}
+ private List<EBPFProfilingTaskCommand> buildProfilingCommands(EBPFProfilingTask task, List<Process> processes) {
+ final ArrayList<EBPFProfilingTaskCommand> commands = new ArrayList<>(processes.size());
+ for (Process process : processes) {
+ // The service id must match between process and task
+ if (!Objects.equals(process.getServiceId(), task.getServiceId())) {
+ continue;
+ }
+
+ // If the task doesn't require a label or the process match all labels in task
+ if (CollectionUtils.isEmpty(task.getProcessLabels())
+ || process.getLabels().containsAll(task.getProcessLabels())) {
+ commands.add(commandService.newEBPFProfilingTaskCommand(task, process.getId()));
+ }
+ }
+ return commands;
+ }
+
@Override
public StreamObserver<org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingData> collectProfilingData(StreamObserver<Commands> responseObserver) {
return new StreamObserver<org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingData>() {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index b07c1f8faa..24c0cef920 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
@@ -71,6 +72,7 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ESEve
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.LogQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ServiceLabelEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileThreadSnapshotQueryEsDAO;
@@ -205,6 +207,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
new EBPFProfilingScheduleEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class,
new EBPFProfilingDataEsDAO(elasticSearchClient, config));
+ this.registerServiceImplementation(IServiceLabelDAO.class,
+ new ServiceLabelEsDAO(elasticSearchClient, config));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index 450dc264fa..47dd3c2b1d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -47,12 +47,12 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD
}
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingDataRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
final SearchBuilder search = Search.builder().query(query).size(scrollingBatchSize);
- query.must(Query.term(EBPFProfilingDataRecord.TASK_ID, taskId));
+ query.must(Query.terms(EBPFProfilingDataRecord.SCHEDULE_ID, scheduleIdList));
query.must(Query.range(EBPFProfilingDataRecord.UPLOAD_TIME).gte(beginTime).lt(endTime));
final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
index 0c3a1c174b..1f153727f5 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+import com.google.gson.Gson;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
@@ -29,10 +30,9 @@ import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
@@ -40,10 +40,15 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskDAO {
+ private static final Gson GSON = new Gson();
+
private final int taskMaxSize;
public EBPFProfilingTaskEsDAO(ElasticSearchClient client, StorageModuleElasticsearchConfig config) {
@@ -52,23 +57,12 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskD
}
@Override
- public List<EBPFProfilingTask> queryTasks(EBPFProfilingProcessFinder finder, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
+ public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
- if (finder.getFinderType() != null) {
- query.must(Query.term(EBPFProfilingTaskRecord.PROCESS_FIND_TYPE, finder.getFinderType().value()));
- }
- if (StringUtil.isNotEmpty(finder.getServiceId())) {
- query.must(Query.term(EBPFProfilingTaskRecord.SERVICE_ID, finder.getServiceId()));
- }
- if (StringUtil.isNotEmpty(finder.getInstanceId())) {
- query.must(Query.term(EBPFProfilingTaskRecord.INSTANCE_ID, finder.getInstanceId()));
- }
- if (finder.getProcessIdList() != null) {
- query.must(Query.terms(EBPFProfilingTaskRecord.PROCESS_ID, finder.getProcessIdList()));
- }
+ query.must(Query.terms(EBPFProfilingTaskRecord.SERVICE_ID, serviceIdList));
if (targetType != null) {
query.must(Query.term(EBPFProfilingTaskRecord.TARGET_TYPE, targetType.value()));
}
@@ -88,24 +82,25 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskD
}
private EBPFProfilingTask parseTask(final SearchHit hit) {
- final EBPFProfilingTask task = new EBPFProfilingTask();
- task.setTaskId(hit.getId());
- task.setProcessFinderType(EBPFProfilingProcessFinderType.valueOf(((Number) hit.getSource().get(EBPFProfilingTaskRecord.PROCESS_FIND_TYPE)).intValue()));
- final String serviceId = (String) hit.getSource().get(EBPFProfilingTaskRecord.SERVICE_ID);
- task.setServiceId(serviceId);
- task.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
- final String instanceId = (String) hit.getSource().get(EBPFProfilingTaskRecord.INSTANCE_ID);
- task.setInstanceId(instanceId);
- task.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
- task.setProcessId((String) hit.getSource().get(EBPFProfilingTaskRecord.PROCESS_ID));
- task.setProcessName((String) hit.getSource().get(EBPFProfilingTaskRecord.PROCESS_NAME));
- task.setTaskStartTime(((Number) hit.getSource().get(EBPFProfilingTaskRecord.START_TIME)).longValue());
- task.setTriggerType(EBPFProfilingTriggerType.valueOf(((Number) hit.getSource().get(EBPFProfilingTaskRecord.TRIGGER_TYPE)).intValue()));
- task.setFixedTriggerDuration(((Number) hit.getSource().get(EBPFProfilingTaskRecord.FIXED_TRIGGER_DURATION)).longValue());
- task.setTargetType(EBPFProfilingTargetType.valueOf(((Number) hit.getSource().get(EBPFProfilingTaskRecord.TARGET_TYPE)).intValue()));
- task.setCreateTime(((Number) hit.getSource().get(EBPFProfilingTaskRecord.CREATE_TIME)).longValue());
- task.setLastUpdateTime(((Number) hit.getSource().get(EBPFProfilingTaskRecord.LAST_UPDATE_TIME)).longValue());
+ final Map<String, Object> sourceAsMap = hit.getSource();
+ final EBPFProfilingTaskRecord.Builder builder = new EBPFProfilingTaskRecord.Builder();
+ final EBPFProfilingTaskRecord record = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+ final EBPFProfilingTask task = new EBPFProfilingTask();
+ task.setTaskId(record.id());
+ task.setServiceId(record.getServiceId());
+ task.setServiceName(IDManager.ServiceID.analysisId(record.getServiceId()).getName());
+ if (StringUtil.isNotEmpty(record.getProcessLabelsJson())) {
+ task.setProcessLabels(GSON.<List<String>>fromJson(record.getProcessLabelsJson(), ArrayList.class));
+ } else {
+ task.setProcessLabels(Collections.emptyList());
+ }
+ task.setTaskStartTime(record.getStartTime());
+ task.setTriggerType(EBPFProfilingTriggerType.valueOf(record.getTriggerType()));
+ task.setFixedTriggerDuration(record.getFixedTriggerDuration());
+ task.setTargetType(EBPFProfilingTargetType.valueOf(record.getTargetType()));
+ task.setCreateTime(record.getCreateTime());
+ task.setLastUpdateTime(record.getLastUpdateTime());
return task;
}
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index f80c2e29b1..d4d4c3aee6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -19,14 +19,18 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import com.google.common.base.Strings;
+import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
+import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
@@ -58,6 +62,8 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchC
import static org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
+ private static final Gson GSON = new Gson();
+
private final int queryMaxSize;
private final int scrollingBatchSize;
@@ -197,12 +203,35 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId, String agentId) throws IOException {
+ public List<Process> listProcesses(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
final SearchBuilder search = Search.builder().query(query).size(queryMaxSize);
+ appendProcessWhereQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+ final SearchResponse results = getClient().search(index, search.build());
+
+ return buildProcesses(results);
+ }
+
+ @Override
+ public long getProcessesCount(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
+ final String index =
+ IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
+
+ final BoolQueryBuilder query = Query.bool();
+ final SearchBuilder search = Search.builder().query(query).size(0);
+ appendProcessWhereQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+ final SearchResponse results = getClient().search(index, search.build());
+
+ return results.getHits().getTotal();
+ }
+
+ private void appendProcessWhereQuery(BoolQueryBuilder query, String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) {
if (StringUtil.isNotEmpty(serviceId)) {
query.must(Query.term(ProcessTraffic.SERVICE_ID, serviceId));
}
@@ -212,9 +241,16 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
if (StringUtil.isNotEmpty(agentId)) {
query.must(Query.term(ProcessTraffic.AGENT_ID, agentId));
}
- final SearchResponse results = getClient().search(index, search.build());
-
- return buildProcesses(results);
+ final RangeQueryBuilder rangeQuery = Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET);
+ if (lastPingStartTimeBucket > 0) {
+ rangeQuery.gte(lastPingStartTimeBucket);
+ }
+ if (lastPingEndTimeBucket > 0) {
+ rangeQuery.lte(lastPingEndTimeBucket);
+ }
+ if (lastPingStartTimeBucket > 0 || lastPingEndTimeBucket > 0) {
+ query.must(rangeQuery);
+ }
}
@Override
@@ -310,6 +346,11 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
process.getAttributes().add(new Attribute(key, value));
}
}
+ final String labelsJson = processTraffic.getLabelsJson();
+ if (StringUtils.isNotEmpty(labelsJson)) {
+ final List<String> labels = GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
processes.add(process);
}
return processes;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ServiceLabelEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ServiceLabelEsDAO.java
new file mode 100644
index 0000000000..a024b396a7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ServiceLabelEsDAO.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
+import org.apache.skywalking.library.elasticsearch.requests.search.Query;
+import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ServiceLabelEsDAO extends EsDAO implements IServiceLabelDAO {
+ private int maxSize;
+
+ public ServiceLabelEsDAO(ElasticSearchClient client, StorageModuleElasticsearchConfig config) {
+ super(client);
+ this.maxSize = config.getMetadataQueryMaxSize();
+ }
+
+ @Override
+ public List<String> queryAllLabels(String serviceId) {
+ final String index =
+ IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceLabelRecord.INDEX_NAME);
+ final BoolQueryBuilder query = Query.bool();
+
+ query.must(Query.term(ServiceLabelRecord.SERVICE_ID, serviceId));
+ final SearchBuilder search = Search.builder().query(query).size(maxSize);
+
+ final SearchResponse response = getClient().search(index, search.build());
+ return response.getHits().getHits().stream().map(this::parseLabel).collect(Collectors.toList());
+ }
+
+ private String parseLabel(final SearchHit hit) {
+ return (String) hit.getSource().get(ServiceLabelRecord.LABEL);
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
index a691da74dd..3602830c1a 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.management.UITemplateManage
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -64,6 +65,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ServiceLabelQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
@@ -136,6 +138,7 @@ public class InfluxStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new EBPFProfilingTaskQuery(client));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new EBPFProfilingScheduleQuery(client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new EBPFProfilingDataQuery(client));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new ServiceLabelQuery(client));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingDataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingDataQuery.java
index 41eb505999..792566c33a 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingDataQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingDataQuery.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+import com.google.common.base.Joiner;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
@@ -35,9 +36,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lt;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.regex;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
@@ -46,7 +47,7 @@ public class EBPFProfilingDataQuery implements IEBPFProfilingDataDAO {
private final InfluxClient client;
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException {
final WhereQueryImpl<SelectQueryImpl> query = select(
EBPFProfilingDataRecord.SCHEDULE_ID,
EBPFProfilingDataRecord.TASK_ID,
@@ -58,7 +59,7 @@ public class EBPFProfilingDataQuery implements IEBPFProfilingDataDAO {
.from(client.getDatabase(), EBPFProfilingDataRecord.INDEX_NAME)
.where();
- query.and(eq(EBPFProfilingDataRecord.TASK_ID, taskId));
+ query.and(regex(EBPFProfilingDataRecord.SCHEDULE_ID, "/" + Joiner.on("|").join(scheduleIdList) + "/"));
query.and(gte(EBPFProfilingDataRecord.UPLOAD_TIME, beginTime));
query.and(lt(EBPFProfilingDataRecord.UPLOAD_TIME, endTime));
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingTaskQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingTaskQuery.java
index 4b2eb842b9..e75f6fa60a 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingTaskQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/EBPFProfilingTaskQuery.java
@@ -19,17 +19,15 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.base.Joiner;
+import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
@@ -51,17 +49,15 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
@RequiredArgsConstructor
public class EBPFProfilingTaskQuery implements IEBPFProfilingTaskDAO {
+ private static final Gson GSON = new Gson();
private final InfluxClient client;
@Override
- public List<EBPFProfilingTask> queryTasks(EBPFProfilingProcessFinder finder, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
+ public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
final WhereQueryImpl<SelectQueryImpl> query = select(
InfluxConstants.ID_COLUMN,
- EBPFProfilingTaskRecord.PROCESS_FIND_TYPE,
EBPFProfilingTaskRecord.SERVICE_ID,
- EBPFProfilingTaskRecord.INSTANCE_ID,
- EBPFProfilingTaskRecord.PROCESS_ID,
- EBPFProfilingTaskRecord.PROCESS_NAME,
+ EBPFProfilingTaskRecord.PROCESS_LABELS_JSON,
EBPFProfilingTaskRecord.START_TIME,
EBPFProfilingTaskRecord.TRIGGER_TYPE,
EBPFProfilingTaskRecord.FIXED_TRIGGER_DURATION,
@@ -72,18 +68,7 @@ public class EBPFProfilingTaskQuery implements IEBPFProfilingTaskDAO {
.from(client.getDatabase(), EBPFProfilingTaskRecord.INDEX_NAME)
.where();
- if (finder.getFinderType() != null) {
- query.and(eq(EBPFProfilingTaskRecord.PROCESS_FIND_TYPE, finder.getFinderType().value()));
- }
- if (StringUtil.isNotEmpty(finder.getServiceId())) {
- query.and(eq(EBPFProfilingTaskRecord.SERVICE_ID, finder.getServiceId()));
- }
- if (StringUtil.isNotEmpty(finder.getInstanceId())) {
- query.and(eq(EBPFProfilingTaskRecord.INSTANCE_ID, finder.getInstanceId()));
- }
- if (CollectionUtils.isNotEmpty(finder.getProcessIdList())) {
- query.and(regex(EBPFProfilingTaskRecord.PROCESS_ID, "/" + Joiner.on("|").join(finder.getProcessIdList()) + "/"));
- }
+ query.and(regex(EBPFProfilingTaskRecord.SERVICE_ID, "/" + Joiner.on("|").join(serviceIdList) + "/"));
if (targetType != null) {
query.and(eq(EBPFProfilingTaskRecord.TARGET_TYPE, targetType.value()));
}
@@ -111,21 +96,21 @@ public class EBPFProfilingTaskQuery implements IEBPFProfilingTaskDAO {
for (List<Object> values : series.getValues()) {
final EBPFProfilingTask task = new EBPFProfilingTask();
task.setTaskId((String) values.get(1));
- task.setProcessFinderType(EBPFProfilingProcessFinderType.valueOf(((Number) values.get(2)).intValue()));
- final String serviceId = (String) values.get(3);
+ final String serviceId = (String) values.get(2);
task.setServiceId(serviceId);
task.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
- final String instanceId = (String) values.get(4);
- task.setInstanceId(instanceId);
- task.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
- task.setProcessId((String) values.get(5));
- task.setProcessName((String) values.get(6));
- task.setTaskStartTime(((Number) values.get(7)).longValue());
- task.setTriggerType(EBPFProfilingTriggerType.valueOf(((Number) values.get(8)).intValue()));
- task.setFixedTriggerDuration(((Number) values.get(9)).longValue());
- task.setTargetType(EBPFProfilingTargetType.valueOf(((Number) values.get(10)).intValue()));
- task.setCreateTime(((Number) values.get(11)).longValue());
- task.setLastUpdateTime(((Number) values.get(12)).longValue());
+ final String processLabelString = (String) values.get(3);
+ if (StringUtil.isNotEmpty(processLabelString)) {
+ task.setProcessLabels(GSON.<List<String>>fromJson(processLabelString, ArrayList.class));
+ } else {
+ task.setProcessLabels(Collections.emptyList());
+ }
+ task.setTaskStartTime(((Number) values.get(4)).longValue());
+ task.setTriggerType(EBPFProfilingTriggerType.valueOf(((Number) values.get(5)).intValue()));
+ task.setFixedTriggerDuration(((Number) values.get(6)).longValue());
+ task.setTargetType(EBPFProfilingTargetType.valueOf(((Number) values.get(7)).intValue()));
+ task.setCreateTime(((Number) values.get(8)).longValue());
+ task.setLastUpdateTime(((Number) values.get(9)).longValue());
tasks.add(task);
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
index 1f2016d483..b90c241e99 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
@@ -59,6 +59,7 @@ import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxCon
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@RequiredArgsConstructor
@@ -158,11 +159,34 @@ public class MetadataQuery implements IMetadataQueryDAO {
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId, String agentId) throws IOException {
+ public List<Process> listProcesses(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
final SelectQueryImpl query = select(
ID_COLUMN, NAME, ProcessTraffic.SERVICE_ID, ProcessTraffic.INSTANCE_ID,
- ProcessTraffic.LAYER, ProcessTraffic.AGENT_ID, ProcessTraffic.DETECT_TYPE, ProcessTraffic.PROPERTIES)
+ ProcessTraffic.LAYER, ProcessTraffic.AGENT_ID, ProcessTraffic.DETECT_TYPE, ProcessTraffic.PROPERTIES,
+ ProcessTraffic.LABELS_JSON)
.from(client.getDatabase(), ProcessTraffic.INDEX_NAME);
+ appendProcessWhereQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+ return buildProcesses(query);
+ }
+
+ @Override
+ public long getProcessesCount(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
+ final SelectQueryImpl query = select().count(ProcessTraffic.PROPERTIES).from(client.getDatabase(), ProcessTraffic.INDEX_NAME);
+ appendProcessWhereQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+
+ List<QueryResult.Result> results = client.query(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result set: {}", query.getCommand(), results);
+ }
+
+ List<QueryResult.Series> counter = results.get(0).getSeries();
+ return ((Number) counter.get(0).getValues().get(0).get(1)).longValue();
+ }
+
+ private void appendProcessWhereQuery(SelectQueryImpl query, String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) {
final WhereQueryImpl<SelectQueryImpl> whereQuery = query.where();
if (StringUtil.isNotEmpty(serviceId)) {
whereQuery.and(eq(TagName.SERVICE_ID, serviceId));
@@ -173,8 +197,12 @@ public class MetadataQuery implements IMetadataQueryDAO {
if (StringUtil.isNotEmpty(agentId)) {
whereQuery.and(eq(TagName.AGENT_ID, agentId));
}
-
- return buildProcesses(query);
+ if (lastPingStartTimeBucket > 0) {
+ whereQuery.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+ }
+ if (lastPingEndTimeBucket > 0) {
+ whereQuery.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
+ }
}
@Override
@@ -289,6 +317,11 @@ public class MetadataQuery implements IMetadataQueryDAO {
process.getAttributes().add(new Attribute(key, value));
}
}
+ String labelJson = (String) values.get(9);
+ if (!Strings.isNullOrEmpty(labelJson)) {
+ List<String> labels = GSON.<List<String>>fromJson(labelJson, ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
instances.add(process);
}
return instances;
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ServiceLabelQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ServiceLabelQuery.java
new file mode 100644
index 0000000000..db0164ef8a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ServiceLabelQuery.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.WhereQueryImpl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+
+@Slf4j
+@RequiredArgsConstructor
+public class ServiceLabelQuery implements IServiceLabelDAO {
+ private final InfluxClient client;
+
+ @Override
+ public List<String> queryAllLabels(String serviceId) throws IOException {
+ final WhereQueryImpl<SelectQueryImpl> query = select(
+ ServiceLabelRecord.LABEL
+ )
+ .from(client.getDatabase(), ServiceLabelRecord.INDEX_NAME)
+ .where();
+
+ query.and(eq(ServiceLabelRecord.SERVICE_ID, serviceId));
+
+ return parseLabels(query);
+ }
+
+ private List<String> parseLabels(WhereQueryImpl<SelectQueryImpl> query) throws IOException {
+ final QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, result: {}", query.getCommand(), series);
+ }
+
+ if (Objects.isNull(series)) {
+ return Collections.emptyList();
+ }
+
+ return series.getValues().stream().map(v -> (String) v.get(1)).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
index fd0e5455ea..c4c4d39521 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -70,6 +71,7 @@ import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBEventQue
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetricsQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTraceQueryDAO;
@@ -136,6 +138,7 @@ public class IoTDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new IoTDBEBPFProfilingTaskDAO(client));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new IoTDBEBPFProfilingScheduleDAO(client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new IoTDBEBPFProfilingDataDAO(client));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new IoTDBServiceLabelQueryDAO(client));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java
index 57c4bc63aa..ef13debe4d 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
@Slf4j
@@ -40,7 +42,7 @@ public class IoTDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
private final StorageBuilder<EBPFProfilingDataRecord> storageBuilder = new EBPFProfilingDataRecord.Builder();
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime)
+ public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime)
throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
@@ -49,7 +51,8 @@ public class IoTDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
IoTDBUtils.addQueryIndexValue(EBPFProfilingDataRecord.INDEX_NAME, query, indexAndValueMap);
StringBuilder where = new StringBuilder(" where ");
- where.append(EBPFProfilingDataRecord.TASK_ID).append(" = \"").append(taskId).append("\" and ");
+ where.append(EBPFProfilingDataRecord.SCHEDULE_ID).append(" in (").append(
+ scheduleIdList.stream().map(s -> "\"" + s + "\"").collect(Collectors.joining(","))).append(") and ");
where.append(EBPFProfilingDataRecord.UPLOAD_TIME).append(" >= ").append(beginTime).append(" and ");
where.append(EBPFProfilingDataRecord.UPLOAD_TIME).append(" <= ").append(endTime).append(" and ");
if (where.length() > 7) {
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingTaskDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingTaskDAO.java
index c4ec2dfebb..cbdf8b4853 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingTaskDAO.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingTaskDAO.java
@@ -18,25 +18,24 @@
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,35 +44,25 @@ import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
@Slf4j
@RequiredArgsConstructor
public class IoTDBEBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
+ private static final Gson GSON = new Gson();
+
private final IoTDBClient client;
private final StorageBuilder<EBPFProfilingTaskRecord> storageBuilder = new EBPFProfilingTaskRecord.Builder();
@Override
- public List<EBPFProfilingTask> queryTasks(EBPFProfilingProcessFinder finder,
+ public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList,
EBPFProfilingTargetType targetType,
long taskStartTime, long latestUpdateTime)
throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
IoTDBUtils.addModelPath(client.getStorageGroup(), query, EBPFProfilingTaskRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(finder.getServiceId())) {
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, finder.getServiceId());
- }
- if (StringUtil.isNotEmpty(finder.getInstanceId())) {
- indexAndValueMap.put(IoTDBIndexes.INSTANCE_ID_INX, finder.getInstanceId());
- }
- if (CollectionUtils.isNotEmpty(finder.getProcessIdList())) {
- final List<String> processIdList = finder.getProcessIdList();
- for (int i = 0; i < processIdList.size(); i++) {
- if (i > 0) {
- query.append(", ");
- }
- final HashMap<String, String> indexWithProcessId = new HashMap<>(indexAndValueMap);
- indexWithProcessId.put(IoTDBIndexes.PROCESS_ID_INX, processIdList.get(i));
- IoTDBUtils.addQueryIndexValue(EBPFProfilingTaskRecord.INDEX_NAME, query, indexWithProcessId);
+ for (int i = 0; i < serviceIdList.size(); i++) {
+ if (i > 0) {
+ query.append(", ");
}
- } else {
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceIdList.get(i));
IoTDBUtils.addQueryIndexValue(EBPFProfilingTaskRecord.INDEX_NAME, query, indexAndValueMap);
}
@@ -105,13 +94,14 @@ public class IoTDBEBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
private EBPFProfilingTask parseTask(EBPFProfilingTaskRecord record) {
final EBPFProfilingTask task = new EBPFProfilingTask();
task.setTaskId(record.id());
- task.setProcessFinderType(EBPFProfilingProcessFinderType.valueOf(record.getProcessFindType()));
task.setServiceId(record.getServiceId());
task.setServiceName(IDManager.ServiceID.analysisId(record.getServiceId()).getName());
- task.setInstanceId(record.getInstanceId());
- task.setInstanceName(IDManager.ServiceInstanceID.analysisId(record.getInstanceId()).getName());
- task.setProcessId(record.getProcessId());
- task.setProcessName(record.getProcessName());
+ final String processLabelsJson = record.getProcessLabelsJson();
+ if (StringUtil.isNotEmpty(processLabelsJson)) {
+ task.setProcessLabels(GSON.<List<String>>fromJson(processLabelsJson, ArrayList.class));
+ } else {
+ task.setProcessLabels(Collections.emptyList());
+ }
task.setTaskStartTime(record.getStartTime());
task.setTriggerType(EBPFProfilingTriggerType.valueOf(record.getTriggerType()));
task.setFixedTriggerDuration(record.getFixedTriggerDuration());
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
index ad3e6b11aa..24921211f9 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
import com.google.common.base.Strings;
+import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
@@ -54,6 +55,8 @@ import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
@Slf4j
@RequiredArgsConstructor
public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
+ private static final Gson GSON = new Gson();
+
private final IoTDBClient client;
private final StorageBuilder<ServiceTraffic> serviceBuilder = new ServiceTraffic.Builder();
private final StorageBuilder<EndpointTraffic> endpointBuilder = new EndpointTraffic.Builder();
@@ -156,10 +159,32 @@ public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId, String agentId)
+ public List<Process> listProcesses(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket)
throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
+ appendProcessFromQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProcessTraffic.INDEX_NAME,
+ query.toString(), processBuilder);
+ return buildProcesses(storageDataList);
+ }
+
+ @Override
+ public long getProcessesCount(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select count(" + ProcessTraffic.PROPERTIES + ") from ");
+ appendProcessFromQuery(query, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+
+ final List<Double> results = client.queryWithAgg(query.toString());
+ return results.size() > 0 ? results.get(0).longValue() : 0;
+ }
+
+ private void appendProcessFromQuery(StringBuilder query, String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) {
IoTDBUtils.addModelPath(client.getStorageGroup(), query, ProcessTraffic.INDEX_NAME);
Map<String, String> indexAndValueMap = new HashMap<>();
if (StringUtil.isNotEmpty(serviceId)) {
@@ -172,11 +197,20 @@ public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
indexAndValueMap.put(IoTDBIndexes.AGENT_ID_INX, agentId);
}
IoTDBUtils.addQueryIndexValue(ProcessTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
- List<? super StorageData> storageDataList = client.filterQuery(ProcessTraffic.INDEX_NAME,
- query.toString(), processBuilder);
- return buildProcesses(storageDataList);
+ StringBuilder where = new StringBuilder();
+ if (lastPingStartTimeBucket > 0) {
+ where.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append(" >= ").append(lastPingStartTimeBucket);
+ }
+ if (lastPingEndTimeBucket > 0) {
+ if (where.length() > 0) {
+ where.append(" and ");
+ }
+ where.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append(" <= ").append(lastPingEndTimeBucket);
+ }
+ if (where.length() > 0) {
+ query.append(" where ").append(where);
+ }
}
@Override
@@ -267,6 +301,11 @@ public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
process.getAttributes().add(new Attribute(key, value));
}
}
+ final String labelsJson = processTraffic.getLabelsJson();
+ if (StringUtil.isNotEmpty(labelsJson)) {
+ final List<String> labels = GSON.<List<String>>fromJson(labelsJson, ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
processes.add(process);
});
return processes;
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBServiceLabelQueryDAO.java
similarity index 53%
copy from oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java
copy to oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBServiceLabelQueryDAO.java
index 57c4bc63aa..74f5159230 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEBPFProfilingDataDAO.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBServiceLabelQueryDAO.java
@@ -6,63 +6,51 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord;
import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
+import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
-public class IoTDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
+public class IoTDBServiceLabelQueryDAO implements IServiceLabelDAO {
private final IoTDBClient client;
- private final StorageBuilder<EBPFProfilingDataRecord> storageBuilder = new EBPFProfilingDataRecord.Builder();
+ private final StorageBuilder<ServiceLabelRecord> storageBuilder = new ServiceLabelRecord.Builder();
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime)
- throws IOException {
+ public List<String> queryAllLabels(String serviceId) throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
- IoTDBUtils.addModelPath(client.getStorageGroup(), query, EBPFProfilingDataRecord.INDEX_NAME);
+ IoTDBUtils.addModelPath(client.getStorageGroup(), query, ServiceLabelRecord.INDEX_NAME);
Map<String, String> indexAndValueMap = new HashMap<>();
- IoTDBUtils.addQueryIndexValue(EBPFProfilingDataRecord.INDEX_NAME, query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- where.append(EBPFProfilingDataRecord.TASK_ID).append(" = \"").append(taskId).append("\" and ");
- where.append(EBPFProfilingDataRecord.UPLOAD_TIME).append(" >= ").append(beginTime).append(" and ");
- where.append(EBPFProfilingDataRecord.UPLOAD_TIME).append(" <= ").append(endTime).append(" and ");
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ IoTDBUtils.addQueryIndexValue(ServiceLabelRecord.INDEX_NAME, query, indexAndValueMap);
query.append(IoTDBClient.ALIGN_BY_DEVICE);
- List<? super StorageData> storageDataList = client.filterQuery(EBPFProfilingDataRecord.INDEX_NAME,
- query.toString(), storageBuilder);
- List<EBPFProfilingDataRecord> dataList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> dataList.add((EBPFProfilingDataRecord) storageData));
- return dataList;
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceLabelRecord.INDEX_NAME,
+ query.toString(), storageBuilder);
+ return storageDataList.stream().map(t -> ((ServiceLabelRecord) t).getLabel()).collect(Collectors.toList());
}
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 997a24462d..fec1fe4aed 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -67,6 +68,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2LogQueryDAO
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
@@ -176,6 +178,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new H2EBPFProfilingTaskDAO(h2Client));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(h2Client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(h2Client));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(h2Client));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingDataDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingDataDAO.java
index b65f149eb0..6ef6a2dda6 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingDataDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingDataDAO.java
@@ -37,13 +37,13 @@ public class H2EBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
private JDBCHikariCPClient h2Client;
@Override
- public List<EBPFProfilingDataRecord> queryData(String taskId, long beginTime, long endTime) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException {
final StringBuilder sql = new StringBuilder();
final StringBuilder conditionSql = new StringBuilder();
- List<Object> condition = new ArrayList<>(4);
+ List<Object> condition = new ArrayList<>(scheduleIdList.size() + 2);
sql.append("select * from ").append(EBPFProfilingDataRecord.INDEX_NAME);
- appendCondition(conditionSql, condition, EBPFProfilingDataRecord.TASK_ID, "=", taskId);
+ appendListCondition(conditionSql, condition, EBPFProfilingDataRecord.SCHEDULE_ID, scheduleIdList);
appendCondition(conditionSql, condition, EBPFProfilingDataRecord.UPLOAD_TIME, ">=", beginTime);
appendCondition(conditionSql, condition, EBPFProfilingDataRecord.UPLOAD_TIME, "<", endTime);
@@ -87,4 +87,19 @@ public class H2EBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
conditionSql.append(filed).append(compare).append("?");
condition.add(data);
}
+
+ private <T> void appendListCondition(StringBuilder conditionSql, List<Object> condition, String filed, List<T> data) {
+ if (conditionSql.length() > 0) {
+ conditionSql.append(" and ");
+ }
+ conditionSql.append(filed).append(" in (");
+ for (int i = 0; i < data.size(); i++) {
+ if (i > 0) {
+ conditionSql.append(", ");
+ }
+ conditionSql.append("?");
+ condition.add(data.get(i));
+ }
+ conditionSql.append(")");
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingTaskDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingTaskDAO.java
index 31872b2151..50e50d3ff6 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingTaskDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2EBPFProfilingTaskDAO.java
@@ -18,17 +18,15 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingProcessFinderType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
-import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.EBPFProfilingProcessFinder;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.io.IOException;
@@ -36,36 +34,23 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
@AllArgsConstructor
public class H2EBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
+ private static final Gson GSON = new Gson();
private JDBCHikariCPClient h2Client;
@Override
- public List<EBPFProfilingTask> queryTasks(EBPFProfilingProcessFinder finder, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
+ public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
final StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(7);
sql.append("select * from ").append(EBPFProfilingTaskRecord.INDEX_NAME);
StringBuilder conditionSql = new StringBuilder();
- if (finder.getFinderType() != null) {
- appendCondition(conditionSql, condition,
- EBPFProfilingTaskRecord.PROCESS_FIND_TYPE, finder.getFinderType().value());
- }
- if (StringUtil.isNotEmpty(finder.getServiceId())) {
- appendCondition(conditionSql, condition,
- EBPFProfilingTaskRecord.SERVICE_ID, finder.getServiceId());
- }
- if (StringUtil.isNotEmpty(finder.getInstanceId())) {
- appendCondition(conditionSql, condition,
- EBPFProfilingTaskRecord.INSTANCE_ID, finder.getInstanceId());
- }
- if (CollectionUtils.isNotEmpty(finder.getProcessIdList())) {
- appendListCondition(conditionSql, condition,
- EBPFProfilingTaskRecord.PROCESS_ID, finder.getProcessIdList());
- }
+ appendListCondition(conditionSql, condition, EBPFProfilingTaskRecord.SERVICE_ID, serviceIdList);
if (targetType != null) {
appendCondition(conditionSql, condition,
EBPFProfilingTaskRecord.TARGET_TYPE, targetType.value());
@@ -98,16 +83,15 @@ public class H2EBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
while (resultSet.next()) {
EBPFProfilingTask task = new EBPFProfilingTask();
task.setTaskId(resultSet.getString(H2TableInstaller.ID_COLUMN));
- task.setProcessFinderType(EBPFProfilingProcessFinderType.valueOf(
- resultSet.getInt(EBPFProfilingTaskRecord.PROCESS_FIND_TYPE)));
final String serviceId = resultSet.getString(EBPFProfilingTaskRecord.SERVICE_ID);
task.setServiceId(serviceId);
task.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
- final String instanceId = resultSet.getString(EBPFProfilingTaskRecord.INSTANCE_ID);
- task.setInstanceId(instanceId);
- task.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
- task.setProcessId(resultSet.getString(EBPFProfilingTaskRecord.PROCESS_ID));
- task.setProcessName(resultSet.getString(EBPFProfilingTaskRecord.PROCESS_NAME));
+ final String processLabelString = resultSet.getString(EBPFProfilingTaskRecord.PROCESS_LABELS_JSON);
+ if (StringUtil.isNotEmpty(processLabelString)) {
+ task.setProcessLabels(GSON.<List<String>>fromJson(processLabelString, ArrayList.class));
+ } else {
+ task.setProcessLabels(Collections.emptyList());
+ }
task.setTaskStartTime(resultSet.getLong(EBPFProfilingTaskRecord.START_TIME));
task.setTriggerType(EBPFProfilingTriggerType.valueOf(
resultSet.getInt(EBPFProfilingTaskRecord.TRIGGER_TYPE)));
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
index 7b3d10f841..1e7df61223 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
@@ -232,10 +232,48 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId, String agentId) throws IOException {
+ public List<Process> listProcesses(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
StringBuilder sql = new StringBuilder();
- List<Object> condition = new ArrayList<>(5);
+ List<Object> condition = new ArrayList<>(3);
sql.append("select * from ").append(ProcessTraffic.INDEX_NAME);
+ appendProcessWhereQuery(sql, condition, serviceId, instanceId, agentId, lastPingStartTimeBucket, lastPingEndTimeBucket);
+ sql.append(" limit ").append(metadataQueryMaxSize);
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(
+ connection, sql.toString(), condition.toArray(new Object[0]))) {
+ return buildProcesses(resultSet);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public long getProcessesCount(String serviceId, String instanceId, String agentId,
+ final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ List<Object> condition = new ArrayList<>(3);
+ sql.append("select count(1) total from ").append(ProcessTraffic.INDEX_NAME);
+ appendProcessWhereQuery(sql, condition, serviceId, instanceId, agentId,
+ lastPingStartTimeBucket, lastPingEndTimeBucket);
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(
+ connection, sql.toString(), condition.toArray(new Object[0]))) {
+ if (!resultSet.next()) {
+ return 0;
+ }
+ return resultSet.getLong("total");
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void appendProcessWhereQuery(StringBuilder sql, List<Object> condition, String serviceId, String instanceId,
+ String agentId, final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) {
if (StringUtil.isNotEmpty(serviceId) || StringUtil.isNotEmpty(instanceId) || StringUtil.isNotEmpty(agentId)) {
sql.append(" where ");
}
@@ -258,16 +296,19 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
sql.append(ProcessTraffic.AGENT_ID).append("=?");
condition.add(agentId);
}
-
- sql.append(" limit ").append(metadataQueryMaxSize);
-
- try (Connection connection = h2Client.getConnection()) {
- try (ResultSet resultSet = h2Client.executeQuery(
- connection, sql.toString(), condition.toArray(new Object[0]))) {
- return buildProcesses(resultSet);
+ if (lastPingStartTimeBucket > 0) {
+ if (!condition.isEmpty()) {
+ sql.append(" and ");
}
- } catch (SQLException e) {
- throw new IOException(e);
+ sql.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append(">=?");
+ condition.add(lastPingStartTimeBucket);
+ }
+ if (lastPingEndTimeBucket > 0) {
+ if (!condition.isEmpty()) {
+ sql.append(" and ");
+ }
+ sql.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append("<=?");
+ condition.add(lastPingEndTimeBucket);
}
}
@@ -316,6 +357,11 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
process.getAttributes().add(new Attribute(key, value));
}
}
+ final String labelJsonString = resultSet.getString(ProcessTraffic.LABELS_JSON);
+ if (!Strings.isNullOrEmpty(labelJsonString)) {
+ List<String> labels = GSON.<List<String>>fromJson(labelJsonString, ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
processes.add(process);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceLabelQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceLabelQueryDAO.java
new file mode 100644
index 0000000000..c5f86cf5ee
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceLabelQueryDAO.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+
+import lombok.AllArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+@AllArgsConstructor
+public class H2ServiceLabelQueryDAO implements IServiceLabelDAO {
+ private JDBCHikariCPClient h2Client;
+
+ @Override
+ public List<String> queryAllLabels(String serviceId) throws IOException {
+ final StringBuilder sql = new StringBuilder();
+ List<Object> condition = new ArrayList<>(1);
+ sql.append("select " + ServiceLabelRecord.LABEL + " from ")
+ .append(ServiceLabelRecord.INDEX_NAME)
+ .append(" where ").append(ServiceLabelRecord.SERVICE_ID).append(" = ?");
+ condition.add(serviceId);
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(
+ connection, sql.toString(), condition.toArray(new Object[0]))) {
+ return parseLabels(resultSet);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private List<String> parseLabels(ResultSet resultSet) throws SQLException {
+ final List<String> labels = new ArrayList<>();
+ while (resultSet.next()) {
+ labels.add(resultSet.getString(ServiceLabelRecord.LABEL));
+ }
+ return labels;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index a5beae9613..2972af86a9 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -62,6 +63,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDele
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
@@ -162,6 +164,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new H2EBPFProfilingTaskDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(mysqlClient));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(mysqlClient));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
index 18d9096c27..643595740a 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -61,6 +62,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EventQueryD
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
@@ -162,6 +164,7 @@ public class PostgreSQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new H2EBPFProfilingTaskDAO(postgresqlClient));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(postgresqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(postgresqlClient));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(postgresqlClient));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
index 5fd4db2b6e..92dc067737 100644
--- a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
@@ -62,6 +63,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDele
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
@@ -167,6 +169,7 @@ public class TiDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new H2EBPFProfilingTaskDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(mysqlClient));
+ this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(mysqlClient));
}
@Override
diff --git a/pom.xml b/pom.xml
index 36c4bcc745..c346ec0f4c 100755
--- a/pom.xml
+++ b/pom.xml
@@ -221,6 +221,7 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
diff --git a/test/e2e-v2/cases/profiling/ebpf/docker-compose.yml b/test/e2e-v2/cases/profiling/ebpf/docker-compose.yml
index d35308a959..6e66c6acb6 100644
--- a/test/e2e-v2/cases/profiling/ebpf/docker-compose.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/docker-compose.yml
@@ -44,6 +44,7 @@ services:
ROVER_PROCESS_DISCOVERY_VM_FINDER_SERVICE_NAME: sqrt
ROVER_PROCESS_DISCOVERY_VM_FINDER_INSTANCE_NAME: test-instance
ROVER_PROCESS_DISCOVERY_VM_FINDER_PROCESS_NAME: "{{.Process.ExeName}}"
+ ROVER_PROCESS_DISCOVERY_VM_FINDER_PROCESS_LABELS: e2e-label1,e2e-label2
ROVER_PROFILING_ACTIVE: "true"
ROVER_PROFILING_CHECK_INTERVAL: 2s
ROVER_PROFILING_FLUSH_INTERVAL: 5s
diff --git a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml b/test/e2e-v2/cases/profiling/ebpf/expected/process-estimate-scale.yml
similarity index 63%
copy from test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
copy to test/e2e-v2/cases/profiling/ebpf/expected/process-estimate-scale.yml
index 05abb5e723..a4bfca0c7c 100644
--- a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/expected/process-estimate-scale.yml
@@ -13,18 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{{- contains . }}
-- taskid: {{ notEmpty .taskid }}
- processfindertype: PROCESS_ID
- serviceid: {{ b64enc "sqrt" }}.1
- servicename: sqrt
- instanceid: {{ b64enc "sqrt" }}.1_{{ b64enc "test-instance" }}
- instancename: test-instance
- processid: {{ notEmpty .processid }}
- processname: sqrt
- taskstarttime: {{ gt .taskstarttime 0 }}
- triggertype: FIXED_TIME
- fixedtriggerduration: 60
- targettype: ON_CPU
- createtime: {{ gt .createtime 0 }}
-{{- end }}
\ No newline at end of file
+1
\ No newline at end of file
diff --git a/test/e2e-v2/cases/profiling/ebpf/expected/process.yml b/test/e2e-v2/cases/profiling/ebpf/expected/process.yml
index 17f1911a4e..31fac87881 100644
--- a/test/e2e-v2/cases/profiling/ebpf/expected/process.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/expected/process.yml
@@ -32,4 +32,9 @@
- name: command_line
value: /sqrt
{{- end }}
+ labels:
+ {{- contains .labels }}
+ - e2e-label1
+ - e2e-label2
+ {{- end }}
{{- end }}
\ No newline at end of file
diff --git a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-create-prepare.yml
similarity index 63%
copy from test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
copy to test/e2e-v2/cases/profiling/ebpf/expected/profiling-create-prepare.yml
index 05abb5e723..b1226a33ef 100644
--- a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-create-prepare.yml
@@ -13,18 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{{- contains . }}
-- taskid: {{ notEmpty .taskid }}
- processfindertype: PROCESS_ID
- serviceid: {{ b64enc "sqrt" }}.1
- servicename: sqrt
- instanceid: {{ b64enc "sqrt" }}.1_{{ b64enc "test-instance" }}
- instancename: test-instance
- processid: {{ notEmpty .processid }}
- processname: sqrt
- taskstarttime: {{ gt .taskstarttime 0 }}
- triggertype: FIXED_TIME
- fixedtriggerduration: 60
- targettype: ON_CPU
- createtime: {{ gt .createtime 0 }}
-{{- end }}
\ No newline at end of file
+couldprofiling: true
+processlabels:
+ {{- contains .processlabels }}
+ - e2e-label1
+ - e2e-label2
+ {{- end }}
\ No newline at end of file
diff --git a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-schedule-list.yml b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-schedule-list.yml
index d1f788c74f..e19b699183 100644
--- a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-schedule-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-schedule-list.yml
@@ -35,6 +35,11 @@
- name: command_line
value: /sqrt
{{- end }}
+ labels:
+ {{- contains .process.labels }}
+ - e2e-label1
+ - e2e-label2
+ {{- end }}
starttime: {{ gt .starttime 0 }}
endtime: {{ gt .endtime 0 }}
{{- end }}
\ No newline at end of file
diff --git a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
index 05abb5e723..f561c46ca7 100644
--- a/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/expected/profiling-task-list.yml
@@ -15,13 +15,13 @@
{{- contains . }}
- taskid: {{ notEmpty .taskid }}
- processfindertype: PROCESS_ID
serviceid: {{ b64enc "sqrt" }}.1
servicename: sqrt
- instanceid: {{ b64enc "sqrt" }}.1_{{ b64enc "test-instance" }}
- instancename: test-instance
- processid: {{ notEmpty .processid }}
- processname: sqrt
+ processlabels:
+ {{- contains .processlabels }}
+ - e2e-label1
+ - e2e-label2
+ {{- end }}
taskstarttime: {{ gt .taskstarttime 0 }}
triggertype: FIXED_TIME
fixedtriggerduration: 60
diff --git a/test/e2e-v2/cases/profiling/ebpf/profiling-cases.yaml b/test/e2e-v2/cases/profiling/ebpf/profiling-cases.yaml
index 81cdda8885..902a3be9eb 100644
--- a/test/e2e-v2/cases/profiling/ebpf/profiling-cases.yaml
+++ b/test/e2e-v2/cases/profiling/ebpf/profiling-cases.yaml
@@ -20,30 +20,26 @@ cases:
expected: expected/service.yml
- query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml instance ls --service-name sqrt
expected: expected/instance.yml
- - query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt
+ - query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt --instance-name test-instance
expected: expected/process.yml
+ - query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf create prepare --service-name sqrt
+ expected: expected/profiling-create-prepare.yml
+ - query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process estimate scale --service-name sqrt --labels e2e-label1,e2e-label2
+ expected: expected/process-estimate-scale.yml
- query: |
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf create fixed --process-id=$( \
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt| yq e '.[0].id' - \
- ) --duration=1m
+ swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf create fixed --service-name sqrt --labels e2e-label1,e2e-label2 --duration 1m
expected: expected/profiling-create.yml
- - query: |
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --process-id=$( \
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt| yq e '.[0].id' - \
- )
+ - query: swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --service-name sqrt
expected: expected/profiling-task-list.yml
- query: |
swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf schedules --task-id=$( \
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --process-id=$( \
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt| yq e '.[0].id' - \
- ) |yq e '.[0].taskid' - \
+ swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --service-name sqrt |yq e '.[0].taskid' -
)
expected: expected/profiling-schedule-list.yml
- query: |
- taskid=$(swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --process-id=$( \
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml process ls --service-name sqrt| yq e '.[0].id' - \
- ) |yq e '.[0].taskid' -);
+ taskid=$(swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf list --service-name sqrt |yq e '.[0].taskid' -)
+ scheduleid=$(swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf schedules --task-id=$taskid |yq e '.[0].scheduleid' -);
start=$(swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf schedules --task-id=$taskid | yq e '.[0].starttime' -)
end=$(swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf schedules --task-id=$taskid | yq e '.[0].endtime' -)
- swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf analysis --task-id=$taskid --time-ranges=$start-$end
+ swctl --base-url=http://${oap_host}:${oap_12800}/graphql --display yaml profiling ebpf analysis --schedule-id=$scheduleid --time-ranges=$start-$end
expected: expected/profiling-analysis.yml
\ No newline at end of file
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index a71b9c4f49..d5fe55a472 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,6 +22,6 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
-SW_ROVER_COMMIT=6244cc59d1a813aa01116b68b1f3a7bbd0fe45ad
+SW_ROVER_COMMIT=f7f5ac31aa2288861ca729ed349f0da9e66d4558
-SW_CTL_COMMIT=60cee4a926a867a7bd934b7f94c9a8517e141608
+SW_CTL_COMMIT=5a62c2e029e17234e6bbad18ced0ce31d0f67ce9