You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/29 04:53:00 UTC
[skywalking] branch master updated: Support autocomplete tags in traces query (backend side), Replace all configurations **_JETTY_** to **_REST_** (#8971)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 12ba7aa533 Support autocomplete tags in traces query (backend side), Replace all configurations **_JETTY_** to **_REST_** (#8971)
12ba7aa533 is described below
commit 12ba7aa5336af9b323bb2acbc1d864b6b3c416df
Author: Wan Kai <wa...@foxmail.com>
AuthorDate: Fri Apr 29 12:52:45 2022 +0800
Support autocomplete tags in traces query (backend side), Replace all configurations **_JETTY_** to **_REST_** (#8971)
* Support autocomplete tags in traces query (backend side)
* [Breaking Change] Replace configuration `**_JETTY_**` to `**_REST_**`.
---
docs/en/changes/changes.md | 2 +
docs/en/setup/backend/configuration-vocabulary.md | 28 ++++---
docs/en/setup/backend/zipkin-trace.md | 18 +++--
.../parser/listener/SegmentAnalysisListener.java | 13 +++
.../manual/searchtag/TagAutocompleteData.java | 94 ++++++++++++++++++++++
.../manual/segment/TraceTagAutocompleteData.java | 55 +++++++++++++
.../segment/TraceTagAutocompleteDispatcher.java | 36 +++++++++
.../oap/server/core/query/TraceQueryService.java | 13 +++
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../oap/server/core/source/TagAutocomplete.java} | 37 +++++----
.../server/core/source/TraceTagAutocomplete.java} | 21 +----
.../server/core/storage/query/ITraceQueryDAO.java | 9 +++
.../oap/query/graphql/resolver/TraceQuery.java | 10 +++
.../src/main/resources/query-protocol | 2 +-
.../receiver/zipkin/ZipkinReceiverConfig.java | 21 +++--
.../receiver/zipkin/ZipkinReceiverProvider.java | 14 ++--
.../server/receiver/zipkin/trace/SpanForward.java | 19 ++++-
.../src/main/resources/application.yml | 33 ++++----
.../config/ApplicationConfigLoaderTestCase.java | 4 +-
.../elasticsearch/query/TraceQueryEsDAO.java | 70 ++++++++++++++++
.../storage/plugin/influxdb/InfluxConstants.java | 4 +
.../storage/plugin/influxdb/TableMetaInfo.java | 11 +++
.../storage/plugin/influxdb/query/TraceQuery.java | 70 ++++++++++++++++
.../server/storage/plugin/iotdb/IoTDBIndexes.java | 3 +-
.../storage/plugin/iotdb/IoTDBTableMetaInfo.java | 3 +
.../plugin/iotdb/query/IoTDBTraceQueryDAO.java | 87 ++++++++++++++++++++
.../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 70 ++++++++++++++++
.../elasticsearch/ZipkinTraceQueryEsDAO.java | 73 ++++++++++++++++-
.../profile/exporter/test/ProfileTraceDAO.java | 17 ++++
29 files changed, 748 insertions(+), 90 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1a7093b9be..10ec023410 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -30,6 +30,8 @@
* Add data-generator module to run OAP in testing mode, generating mock data for testing.
* Support receive Kubernetes processes from gRPC protocol.
* Fix the problem that es index(TimeSeriesTable, eg. endpoint_traffic, alarm_record) didn't create even after rerun with init-mode. This problem caused the OAP server to fail to start when the OAP server was down for more than a day.
+* Support autocomplete tags in traces query.
+* [Breaking Change] Replace all configurations `**_JETTY_**` to `**_REST_**`.
#### UI
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index a75d6f1054..cf50693651 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -7,10 +7,9 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | restHost| Binding IP of RESTful services. Services include GraphQL query and HTTP data report. |SW_CORE_REST_HOST| 0.0.0.0 |
| - | - | restPort | Binding port of RESTful services. | SW_CORE_REST_PORT| 12800 |
| - | - | restContextPath| Web context path of RESTful services. | SW_CORE_REST_CONTEXT_PATH| / |
-| - | - | restMinThreads| Minimum thread number of RESTful services. | SW_CORE_REST_JETTY_MIN_THREADS| 1 |
-| - | - | restMaxThreads| Maximum thread number of RESTful services. | SW_CORE_REST_JETTY_MAX_THREADS| 200 |
-| - | - | restIdleTimeOut| Connector idle timeout of RESTful services (in milliseconds). | SW_CORE_REST_JETTY_IDLE_TIMEOUT| 30000 |
-| - | - | restAcceptQueueSize| ServerSocketChannel Backlog of RESTful services. | SW_CORE_REST_JETTY_QUEUE_SIZE| 0 |
+| - | - | restMaxThreads| Maximum thread number of RESTful services. | SW_CORE_REST_REST_MAX_THREADS| 200 |
+| - | - | restIdleTimeOut| Connector idle timeout of RESTful services (in milliseconds). | SW_CORE_REST_IDLE_TIMEOUT| 30000 |
+| - | - | restAcceptQueueSize| ServerSocketChannel Backlog of RESTful services. | SW_CORE_REST_QUEUE_SIZE| 0 |
| - | - | httpMaxRequestHeaderSize| Maximum request header size accepted. | SW_CORE_HTTP_MAX_REQUEST_HEADER_SIZE| 8192 |
| - | - | gRPCHost| Binding IP of gRPC services, including gRPC data report and internal communication among OAP nodes. |SW_CORE_GRPC_HOST| 0.0.0.0 |
| - | - | gRPCPort| Binding port of gRPC services. | SW_CORE_GRPC_PORT| 11800 |
@@ -166,10 +165,9 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | restHost| Binding IP of RESTful services. Services include GraphQL query and HTTP data report. | SW_RECEIVER_SHARING_REST_HOST | - |
| - | - | restPort | Binding port of RESTful services. | SW_RECEIVER_SHARING_REST_PORT | - |
| - | - | restContextPath| Web context path of RESTful services. | SW_RECEIVER_SHARING_REST_CONTEXT_PATH | - |
-| - | - | restMinThreads| Minimum thread number of RESTful services. | SW_RECEIVER_SHARING_JETTY_MIN_THREADS| 1 |
-| - | - | restMaxThreads| Maximum thread number of RESTful services. | SW_RECEIVER_SHARING_JETTY_MAX_THREADS| 200 |
-| - | - | restIdleTimeOut| Connector idle timeout of RESTful services (in milliseconds). | SW_RECEIVER_SHARING_JETTY_IDLE_TIMEOUT| 30000 |
-| - | - | restAcceptQueueSize| ServerSocketChannel backlog of RESTful services. | SW_RECEIVER_SHARING_JETTY_QUEUE_SIZE| 0 |
+| - | - | restMaxThreads| Maximum thread number of RESTful services. | SW_RECEIVER_SHARING_REST_MAX_THREADS| 200 |
+| - | - | restIdleTimeOut| Connector idle timeout of RESTful services (in milliseconds). | SW_RECEIVER_SHARING_REST_IDLE_TIMEOUT| 30000 |
+| - | - | restAcceptQueueSize| ServerSocketChannel backlog of RESTful services. | SW_RECEIVER_SHARING_REST_QUEUE_SIZE| 0 |
| - | - | httpMaxRequestHeaderSize| Maximum request header size accepted. | SW_RECEIVER_SHARING_HTTP_MAX_REQUEST_HEADER_SIZE| 8192 |
| - | - | gRPCHost| Binding IP of gRPC services. Services include gRPC data report and internal communication among OAP nodes. | SW_RECEIVER_GRPC_HOST | 0.0.0.0. Not Activated |
| - | - | gRPCPort| Binding port of gRPC services. | SW_RECEIVER_GRPC_PORT | Not Activated |
@@ -202,10 +200,16 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| receiver-otel | default | A receiver for analyzing metrics data from OpenTelemetry. | - | - |
| - | - | enabledHandlers| Enabled handlers for otel. | SW_OTEL_RECEIVER_ENABLED_HANDLERS | - |
| - | - | enabledOcRules| Enabled metric rules for OC handler. | SW_OTEL_RECEIVER_ENABLED_OC_RULES | - |
-| receiver-zipkin |default| A receiver for Zipkin traces. | - | - |
-| - | - | restHost| Binding IP of RESTful services. |SW_RECEIVER_ZIPKIN_HOST| 0.0.0.0 |
-| - | - | restPort | Binding port of RESTful services. | SW_RECEIVER_ZIPKIN_PORT| 9411 |
-| - | - | restContextPath| Web context path of RESTful services. | SW_RECEIVER_ZIPKIN_CONTEXT_PATH| / |
+ receiver-zipkin |default| A receiver for Zipkin traces. | - | - |
+| - | - | restHost| Binding IP of RESTful services. |SW_RECEIVER_ZIPKIN_REST_HOST| 0.0.0.0 |
+| - | - | restPort | Binding port of RESTful services. | SW_RECEIVER_ZIPKIN_REST_PORT| 9411 |
+| - | - | restContextPath| Web context path of RESTful services. | SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH| / |
+| - | - | restMaxThreads| Maximum thread number of RESTful services. | SW_RECEIVER_ZIPKIN_REST_MAX_THREADS| 200 |
+| - | - | restIdleTimeOut| Connector idle timeout of RESTful services (in milliseconds). | SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT| 30000 |
+| - | - | restAcceptorPriorityDelta| ServerSocketChannel backlog of RESTful services. | SW_RECEIVER_ZIPKIN_REST_DELTA| 0 |
+| - | - | restAcceptQueueSize| Maximum request header size accepted. | SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE| 0 |
+| - | - | instanceNameRule| Get the instance name from these tags. | SW_RECEIVER_ZIPKIN_INSTANCE_NAME_RULE| [spring.instance_id,node_id] |
+| - | - | searchableTracesTags| Defines a set of span tag keys which are searchable. Multiple values are separated by commas. | SW_ZIPKIN_SEARCHABLE_TAG_KEYS| http.method |
| prometheus-fetcher | default | Prometheus fetcher reads metrics from Prometheus endpoint, and transfer the metrics into SkyWalking native format for the MAL engine. | - | - |
| - | - | enabledRules | Enabled rules. | SW_PROMETHEUS_FETCHER_ENABLED_RULES | self |
| - | - | maxConvertWorker | The maximize meter convert worker. | SW_PROMETHEUS_FETCHER_NUM_CONVERT_WORKER | -1(by default, half the number of CPU core(s)) |
diff --git a/docs/en/setup/backend/zipkin-trace.md b/docs/en/setup/backend/zipkin-trace.md
index 450769aa76..6237e6ca29 100644
--- a/docs/en/setup/backend/zipkin-trace.md
+++ b/docs/en/setup/backend/zipkin-trace.md
@@ -8,14 +8,16 @@ Use the following config to activate it.
receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
default:
- host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
- port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
- contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
- jettyMinThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MIN_THREADS:1}
- jettyMaxThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MAX_THREADS:200}
- jettyIdleTimeOut: ${SW_RECEIVER_ZIPKIN_JETTY_IDLE_TIMEOUT:30000}
- jettyAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_JETTY_DELTA:0}
- jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
+ # For HTTP server
+ restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}
+ restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411}
+ restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/}
+ restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200}
+ restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000}
+ restAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_REST_DELTA:0}
+ restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0}
+ instanceNameRule: ${SW_RECEIVER_ZIPKIN_INSTANCE_NAME_RULE:[spring.instance_id,node_id]}
+ searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
```
NOTE: Zipkin receiver requires `zipkin-elasticsearch` storage implementation to be activated.
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
index 0f03d4be6c..bf69cef469 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
@@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
+import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.strategy.SegmentStatusAnalyzer;
@@ -173,6 +174,18 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
segment.setEndpointId(endpointId);
sourceReceiver.receive(segment);
+ addAutocompleteTags();
+ }
+
+ private void addAutocompleteTags() {
+ segment.getTags().forEach(tag -> {
+ TraceTagAutocomplete tagAutocomplete = new TraceTagAutocomplete();
+ tagAutocomplete.setTag(tag.toString());
+ tagAutocomplete.setTagKey(tag.getKey());
+ tagAutocomplete.setTagValue(tag.getValue());
+ tagAutocomplete.setTimeBucket(TimeBucket.getMinuteTimeBucket(segment.getStartTime()));
+ sourceReceiver.receive(tagAutocomplete);
+ });
}
private enum SAMPLE_STATUS {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/searchtag/TagAutocompleteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/searchtag/TagAutocompleteData.java
new file mode 100644
index 0000000000..d0f19f2a1b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/searchtag/TagAutocompleteData.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.searchtag;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@EqualsAndHashCode(of = {
+ "tag"
+})
+public abstract class TagAutocompleteData extends Metrics {
+ public static final String TAG_KEY = "tag_key";
+ public static final String TAG_VALUE = "tag_value";
+
+ @Setter
+ @Getter
+ private String tag;
+ @Setter
+ @Getter
+ @Column(columnName = TAG_KEY)
+ private String tagKey;
+ @Setter
+ @Getter
+ @Column(columnName = TAG_VALUE)
+ private String tagValue;
+
+ @Override
+ public boolean combine(final Metrics metrics) {
+ return true;
+ }
+
+ @Override
+ public void calculate() {
+
+ }
+
+ @Override
+ public Metrics toHour() {
+ return null;
+ }
+
+ @Override
+ public Metrics toDay() {
+ return null;
+ }
+
+ @Override
+ protected String id0() {
+ return toTimeBucketInDay() + "-" + tag;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return this.hashCode();
+ }
+
+ @Override
+ public void deserialize(final RemoteData remoteData) {
+ setTag(remoteData.getDataStrings(0));
+ setTagKey(remoteData.getDataStrings(1));
+ setTagValue(remoteData.getDataStrings(2));
+ setTimeBucket(remoteData.getDataLongs(0));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder builder = RemoteData.newBuilder();
+ builder.addDataStrings(tag);
+ builder.addDataStrings(tagKey);
+ builder.addDataStrings(tagValue);
+ builder.addDataLongs(getTimeBucket());
+ return builder;
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteData.java
new file mode 100644
index 0000000000..7b571513a0
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteData.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import lombok.EqualsAndHashCode;
+import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@Stream(name = TraceTagAutocompleteData.INDEX_NAME, scopeId = DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE,
+ builder = TraceTagAutocompleteData.Builder.class, processor = MetricsStreamProcessor.class)
+@MetricsExtension(supportDownSampling = false, supportUpdate = false, timeRelativeID = true)
+@EqualsAndHashCode(callSuper = true)
+public class TraceTagAutocompleteData extends TagAutocompleteData {
+ public static final String INDEX_NAME = "trace_tag_autocomplete";
+
+ public static class Builder implements StorageBuilder<TraceTagAutocompleteData> {
+ @Override
+ public TraceTagAutocompleteData storage2Entity(final Convert2Entity converter) {
+ TraceTagAutocompleteData record = new TraceTagAutocompleteData();
+ record.setTagKey((String) converter.get(TAG_KEY));
+ record.setTagValue((String) converter.get(TAG_VALUE));
+ record.setTag(record.getTagKey() + "=" + record.getTagValue());
+ return record;
+ }
+
+ @Override
+ public void entity2Storage(final TraceTagAutocompleteData storageData, final Convert2Storage converter) {
+ converter.accept(TAG_KEY, storageData.getTagKey());
+ converter.accept(TAG_VALUE, storageData.getTagValue());
+ converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteDispatcher.java
new file mode 100644
index 0000000000..d1de083065
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/TraceTagAutocompleteDispatcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
+
+public class TraceTagAutocompleteDispatcher implements SourceDispatcher<TraceTagAutocomplete> {
+
+ @Override
+ public void dispatch(TraceTagAutocomplete source) {
+ TraceTagAutocompleteData autocomplete = new TraceTagAutocompleteData();
+ autocomplete.setTag(source.getTag());
+ autocomplete.setTagKey(source.getTagKey());
+ autocomplete.setTagValue(source.getTagValue());
+ autocomplete.setTimeBucket(source.getTimeBucket());
+ MetricsStreamProcessor.getInstance().in(autocomplete);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index 35af099f28..b58ea08557 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
@@ -128,6 +129,18 @@ public class TraceQueryService implements Service {
return trace;
}
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ return getTraceQueryDAO().queryTraceTagAutocompleteKeys(startSecondTB, endSecondTB);
+ }
+
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ return getTraceQueryDAO().queryTraceTagAutocompleteValues(tagKey, limit, startSecondTB, endSecondTB);
+ }
+
private List<Span> buildSpanList(SegmentObject segmentObject) {
List<Span> spans = new ArrayList<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index d3cc706128..596be80d7d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -101,6 +101,7 @@ public class DefaultScopeDefine {
public static final int EBPF_PROFILING_SCHEDULE = 47;
public static final int EBPF_PROFILING_DATA = 48;
public static final int SERVICE_LABEL = 49;
+ public static final int TRACE_TAG_AUTOCOMPLETE = 50;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TagAutocomplete.java
similarity index 61%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TagAutocomplete.java
index fe42705fd2..c708e0d1e4 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TagAutocomplete.java
@@ -16,24 +16,31 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.core.source;
import lombok.Getter;
import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import java.util.ArrayList;
-import java.util.List;
+public abstract class TagAutocomplete extends Source {
+
+ @Override
+ public int scope() {
+ return DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE;
+ }
+
+ @Override
+ public String getEntityId() {
+ return tag;
+ }
+
+ @Setter
+ @Getter
+ private String tag;
+ @Setter
+ @Getter
+ private String tagKey;
+ @Setter
+ @Getter
+ private String tagValue;
-@Setter
-@Getter
-public class ZipkinReceiverConfig extends ModuleConfig {
- private String host;
- private int port;
- private String contextPath;
- private int maxThreads = 200;
- private long idleTimeOut = 30000;
- private int acceptorPriorityDelta = 0;
- private int acceptQueueSize = 0;
- private List<String> instanceNameRule = new ArrayList<>();
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TraceTagAutocomplete.java
similarity index 57%
copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TraceTagAutocomplete.java
index fe42705fd2..d21ab7b567 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/TraceTagAutocomplete.java
@@ -16,24 +16,9 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.zipkin;
+package org.apache.skywalking.oap.server.core.source;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+@ScopeDeclaration(id = DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE, name = "TraceTagAutocomplete")
+public class TraceTagAutocomplete extends TagAutocomplete {
-import java.util.ArrayList;
-import java.util.List;
-
-@Setter
-@Getter
-public class ZipkinReceiverConfig extends ModuleConfig {
- private String host;
- private int port;
- private String contextPath;
- private int maxThreads = 200;
- private long idleTimeOut = 30000;
- private int acceptorPriorityDelta = 0;
- private int acceptQueueSize = 0;
- private List<String> instanceNameRule = new ArrayList<>();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index 7db5741231..facfae45f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
@@ -50,4 +51,12 @@ public interface ITraceQueryDAO extends Service {
* This method gives more flexible for 3rd trace without segment concept, which can't search data through {@link #queryByTraceId(String)}
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
+
+ Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException;
+
+ Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
index 2db4705090..e9cdaff98f 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
@@ -21,10 +21,12 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import graphql.kickstart.tools.GraphQLQueryResolver;
import com.google.common.base.Strings;
import java.io.IOException;
+import java.util.Set;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.TraceQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
@@ -81,4 +83,12 @@ public class TraceQuery implements GraphQLQueryResolver {
public Trace queryTrace(final String traceId) throws IOException {
return getQueryService().queryTrace(traceId);
}
+
+ public Set<String> queryTraceTagAutocompleteKeys(final Duration queryDuration) throws IOException {
+ return getQueryService().queryTraceTagAutocompleteKeys(queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
+ }
+
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final Duration queryDuration) throws IOException {
+ return getQueryService().queryTraceTagAutocompleteValues(tagKey, 100, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
+ }
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 4fb447b78d..f40f3914b3 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 4fb447b78d6e31912cbd8c8ae82cca1fc80861de
+Subproject commit f40f3914b39efb6590bc51a7014dc5080361d9ba
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
index fe42705fd2..120f8b731b 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.receiver.zipkin;
import lombok.Getter;
import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import java.util.ArrayList;
@@ -28,12 +29,18 @@ import java.util.List;
@Setter
@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
- private String host;
- private int port;
- private String contextPath;
- private int maxThreads = 200;
- private long idleTimeOut = 30000;
- private int acceptorPriorityDelta = 0;
- private int acceptQueueSize = 0;
+ private String restHost;
+ private int restPort;
+ private String restContextPath;
+ private int restMaxThreads = 200;
+ private long restIdleTimeOut = 30000;
+ private int restAcceptorPriorityDelta = 0;
+ private int restAcceptQueueSize = 0;
private List<String> instanceNameRule = new ArrayList<>();
+ private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS;
+
+ private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
+ Const.COMMA,
+ "http.method"
+ );
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index 63948bba43..37b4668a2f 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -63,14 +63,14 @@ public class ZipkinReceiverProvider extends ModuleProvider {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
- .host(config.getHost())
- .port(config.getPort())
- .contextPath(config.getContextPath())
- .idleTimeOut(config.getIdleTimeOut())
+ .host(config.getRestHost())
+ .port(config.getRestPort())
+ .contextPath(config.getRestContextPath())
+ .idleTimeOut(config.getRestIdleTimeOut())
.acceptorPriorityDelta(
- config.getAcceptorPriorityDelta())
- .maxThreads(config.getMaxThreads())
- .acceptQueueSize(config.getAcceptQueueSize())
+ config.getRestAcceptorPriorityDelta())
+ .maxThreads(config.getRestMaxThreads())
+ .acceptQueueSize(config.getRestAcceptQueueSize())
.build();
httpServer = new HTTPServer(httpServerConfig);
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index 424652300c..485c41bcb1 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -18,10 +18,13 @@
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
+import java.util.Arrays;
import java.util.List;
import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -86,8 +89,13 @@ public class SpanForward {
zipkinSpan.setLatency((int) latency);
zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
+ List<String> searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA));
span.tags().forEach((key, value) -> {
- zipkinSpan.getTags().add(key + "=" + value);
+ if (searchTagKeys.contains(key)) {
+ String tagString = key + "=" + value;
+ zipkinSpan.getTags().add(tagString);
+ addAutocompleteTags(minuteTimeBucket, key, value, tagString);
+ }
});
receiver.receive(zipkinSpan);
@@ -102,6 +110,15 @@ public class SpanForward {
});
}
+ private void addAutocompleteTags(final long minuteTimeBucket, final String key, final String value, final String tagString) {
+ TraceTagAutocomplete tagAutocomplete = new TraceTagAutocomplete();
+ tagAutocomplete.setTag(tagString);
+ tagAutocomplete.setTagKey(key);
+ tagAutocomplete.setTagValue(value);
+ tagAutocomplete.setTimeBucket(minuteTimeBucket);
+ receiver.receive(tagAutocomplete);
+ }
+
private String getServiceInstanceName(Span span) {
for (String tagName : config.getInstanceNameRule()) {
String serviceInstanceName = span.tags().get(tagName);
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 6f9589148e..14b385e575 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -74,10 +74,9 @@ core:
restHost: ${SW_CORE_REST_HOST:0.0.0.0}
restPort: ${SW_CORE_REST_PORT:12800}
restContextPath: ${SW_CORE_REST_CONTEXT_PATH:/}
- restMinThreads: ${SW_CORE_REST_JETTY_MIN_THREADS:1}
- restMaxThreads: ${SW_CORE_REST_JETTY_MAX_THREADS:200}
- restIdleTimeOut: ${SW_CORE_REST_JETTY_IDLE_TIMEOUT:30000}
- restAcceptQueueSize: ${SW_CORE_REST_JETTY_QUEUE_SIZE:0}
+ restMaxThreads: ${SW_CORE_REST_MAX_THREADS:200}
+ restIdleTimeOut: ${SW_CORE_REST_IDLE_TIMEOUT:30000}
+ restAcceptQueueSize: ${SW_CORE_REST_QUEUE_SIZE:0}
httpMaxRequestHeaderSize: ${SW_CORE_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0}
gRPCPort: ${SW_CORE_GRPC_PORT:11800}
@@ -288,14 +287,13 @@ event-analyzer:
receiver-sharing-server:
selector: ${SW_RECEIVER_SHARING_SERVER:default}
default:
- # For Jetty server
+ # For HTTP server
restHost: ${SW_RECEIVER_SHARING_REST_HOST:0.0.0.0}
restPort: ${SW_RECEIVER_SHARING_REST_PORT:0}
restContextPath: ${SW_RECEIVER_SHARING_REST_CONTEXT_PATH:/}
- restMinThreads: ${SW_RECEIVER_SHARING_JETTY_MIN_THREADS:1}
- restMaxThreads: ${SW_RECEIVER_SHARING_JETTY_MAX_THREADS:200}
- restIdleTimeOut: ${SW_RECEIVER_SHARING_JETTY_IDLE_TIMEOUT:30000}
- restAcceptQueueSize: ${SW_RECEIVER_SHARING_JETTY_QUEUE_SIZE:0}
+ restMaxThreads: ${SW_RECEIVER_SHARING_REST_MAX_THREADS:200}
+ restIdleTimeOut: ${SW_RECEIVER_SHARING_REST_IDLE_TIMEOUT:30000}
+ restAcceptQueueSize: ${SW_RECEIVER_SHARING_REST_QUEUE_SIZE:0}
httpMaxRequestHeaderSize: ${SW_RECEIVER_SHARING_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
# For gRPC server
gRPCHost: ${SW_RECEIVER_GRPC_HOST:0.0.0.0}
@@ -384,15 +382,16 @@ receiver-otel:
receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
default:
- host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
- port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
- contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
- jettyMinThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MIN_THREADS:1}
- jettyMaxThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MAX_THREADS:200}
- jettyIdleTimeOut: ${SW_RECEIVER_ZIPKIN_JETTY_IDLE_TIMEOUT:30000}
- jettyAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_JETTY_DELTA:0}
- jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
+ # For HTTP server
+ restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}
+ restPort: ${SW_RECEIVER_ZIPKIN_REST_PORT:9411}
+ restContextPath: ${SW_RECEIVER_ZIPKIN_REST_CONTEXT_PATH:/}
+ restMaxThreads: ${SW_RECEIVER_ZIPKIN_REST_MAX_THREADS:200}
+ restIdleTimeOut: ${SW_RECEIVER_ZIPKIN_REST_IDLE_TIMEOUT:30000}
+ restAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_REST_DELTA:0}
+ restAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_REST_QUEUE_SIZE:0}
instanceNameRule: ${SW_RECEIVER_ZIPKIN_INSTANCE_NAME_RULE:[spring.instance_id,node_id]}
+ searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
receiver-browser:
selector: ${SW_RECEIVER_BROWSER:default}
diff --git a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
index ef42341084..a30f263818 100644
--- a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
+++ b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
@@ -65,7 +65,7 @@ public class ApplicationConfigLoaderTestCase {
public void testLoadStringTypeConfig() {
Properties providerConfig = applicationConfiguration.getModuleConfiguration("receiver-zipkin")
.getProviderConfiguration("default");
- String host = (String) providerConfig.get("host");
+ String host = (String) providerConfig.get("restHost");
assertEquals("0.0.0.0", host);
}
@@ -73,7 +73,7 @@ public class ApplicationConfigLoaderTestCase {
public void testLoadIntegerTypeConfig() {
Properties providerConfig = applicationConfiguration.getModuleConfiguration("receiver-zipkin")
.getProviderConfiguration("default");
- Integer port = (Integer) providerConfig.get("port");
+ Integer port = (Integer) providerConfig.get("restPort");
assertEquals(Integer.valueOf(9411), port);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index 05f9dc5795..d142dc75ac 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -22,18 +22,23 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
+import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
@@ -182,4 +187,69 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ BoolQueryBuilder query = Query.bool();
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ final SearchBuilder search = Search.builder().query(query);
+ search.aggregation(Aggregation.terms(TraceTagAutocompleteData.TAG_KEY)
+ .field(TraceTagAutocompleteData.TAG_KEY));
+
+ final SearchResponse response = getClient().search(
+ new TimeRangeIndexNameGenerator(TraceTagAutocompleteData.INDEX_NAME, startSecondTB, endSecondTB),
+ search.build()
+ );
+ Map<String, Object> terms =
+ (Map<String, Object>) response.getAggregations().get(TraceTagAutocompleteData.TAG_KEY);
+ List<Map<String, Object>> buckets = (List<Map<String, Object>>) terms.get("buckets");
+ Set<String> tagKeys = new HashSet<>();
+ for (Map<String, Object> bucket : buckets) {
+ String tagKey = (String) bucket.get("key");
+ if (bucket.get("key") == null) {
+ continue;
+ }
+ tagKeys.add(tagKey);
+ }
+ return tagKeys;
+ }
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final int limit, final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ BoolQueryBuilder query = Query.bool().must(Query.term(TraceTagAutocompleteData.TAG_KEY, tagKey));
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ final SearchBuilder search = Search.builder().query(query).size(limit);
+
+ final SearchResponse response = getClient().search(
+ new TimeRangeIndexNameGenerator(
+ IndexController.LogicIndicesRegister.getPhysicalTableName(TraceTagAutocompleteData.INDEX_NAME),
+ startSecondTB, endSecondTB
+ ),
+ search.build()
+ );
+ Set<String> tagValues = new HashSet<>();
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ TraceTagAutocompleteData tag = new TraceTagAutocompleteData.Builder().storage2Entity(
+ new HashMapConverter.ToEntity(searchHit.getSource()));
+ tagValues.add(tag.getTagValue());
+ }
+ return tagValues;
+ }
+
+ private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final BoolQueryBuilder query) {
+ long startMinTB = startSecondTB / 100;
+ long endMinTB = endSecondTB / 100;
+ final RangeQueryBuilder rangeQuery = Query.range(TraceTagAutocompleteData.TIME_BUCKET);
+ if (startMinTB > 0) {
+ rangeQuery.gte(startMinTB);
+ }
+ if (endMinTB > 0) {
+ rangeQuery.lte(endMinTB);
+ }
+ if (startMinTB > 0 || endMinTB > 0) {
+ query.must(rangeQuery);
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
index 606470195b..06896feee6 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
@@ -50,5 +50,9 @@ public interface InfluxConstants {
String INSTANCE_ID = "_instance_id";
String AGENT_ID = "_agent_id";
+
+ String AUTOCOMPLETE_TAG_KEY = "_autocomplete_tag_key";
+
+ String AUTOCOMPLETE_TAG_VALUE = "_autocomplete_tag_value";
}
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
index c383ab6d2a..562c73e60b 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
@@ -28,7 +28,9 @@ import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
@@ -103,6 +105,15 @@ public class TableMetaInfo {
}
}
+ if (model.getName().equals(TraceTagAutocompleteData.INDEX_NAME)) {
+ if (storageAndColumnMap.containsKey(TagAutocompleteData.TAG_KEY)) {
+ storageAndTagMap.put(TagAutocompleteData.TAG_KEY, InfluxConstants.TagName.AUTOCOMPLETE_TAG_KEY);
+ }
+ if (storageAndColumnMap.containsKey(TagAutocompleteData.TAG_VALUE)) {
+ storageAndTagMap.put(TagAutocompleteData.TAG_VALUE, InfluxConstants.TagName.AUTOCOMPLETE_TAG_VALUE);
+ }
+ }
+
final TableMetaInfo info = TableMetaInfo.builder()
.model(model)
.storageAndTagMap(storageAndTagMap)
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
index 42df36280e..54bf0e07ba 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
@@ -23,8 +23,11 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
@@ -223,4 +226,71 @@ public class TraceQuery implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+
+ WhereQueryImpl<SelectQueryImpl> query = select()
+ .function("distinct", TraceTagAutocompleteData.TAG_KEY)
+ .from(client.getDatabase(), TraceTagAutocompleteData.INDEX_NAME)
+ .where();
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result set: {}", query.getCommand(), series);
+ }
+ if (series == null) {
+ return Collections.emptySet();
+ }
+ Set<String> tagKeys = new HashSet<>();
+ for (List<Object> values : series.getValues()) {
+ String tagKey = (String) values.get(1);
+ tagKeys.add(tagKey);
+ }
+
+ return tagKeys;
+ }
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ WhereQueryImpl<SelectQueryImpl> query = select()
+ .column(TraceTagAutocompleteData.TAG_VALUE)
+ .from(client.getDatabase(), TraceTagAutocompleteData.INDEX_NAME)
+ .where();
+ query.limit(limit);
+ query.and(eq(TraceTagAutocompleteData.TAG_KEY, tagKey));
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result set: {}", query.getCommand(), series);
+ }
+ if (series == null) {
+ return Collections.emptySet();
+ }
+ Set<String> tagValues = new HashSet<>();
+ for (List<Object> values : series.getValues()) {
+ String tagValue = (String) values.get(1);
+ tagValues.add(tagValue);
+ }
+
+ return tagValues;
+ }
+
+ private void appendTagAutocompleteCondition(final long startSecondTB,
+ final long endSecondTB,
+ final WhereQueryImpl<SelectQueryImpl> query) {
+ long startMinTB = startSecondTB / 100;
+ long endMinTB = endSecondTB / 100;
+ if (startMinTB > 0) {
+ query.and(gte(TraceTagAutocompleteData.TIME_BUCKET, startMinTB));
+ }
+ if (endMinTB > 0) {
+ query.and(lte(TraceTagAutocompleteData.TIME_BUCKET, endMinTB));
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
index 37c61c48b0..4697abe778 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
@@ -29,10 +29,11 @@ public interface IoTDBIndexes {
String INSTANCE_ID_INX = "instance_id";
String PROCESS_ID_INX = "process_id";
String AGENT_ID_INX = "agent_id";
+ String AUTOCOMPLETE_TAG_KEY = "tag_key";
static boolean isIndex(String key) {
return key.equals(ID_IDX) || key.equals(ENTITY_ID_IDX) || key.equals(LAYER_IDX) ||
key.equals(SERVICE_ID_IDX) || key.equals(GROUP_IDX) || key.equals(TRACE_ID_IDX) ||
- key.equals(INSTANCE_ID_INX) || key.equals(AGENT_ID_INX) || key.equals(PROCESS_ID_INX);
+ key.equals(INSTANCE_ID_INX) || key.equals(AGENT_ID_INX) || key.equals(PROCESS_ID_INX) || key.equals(AUTOCOMPLETE_TAG_KEY);
}
}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
index 2567cde7f3..4ebbb4b9ec 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
@@ -85,6 +85,9 @@ public class IoTDBTableMetaInfo {
if (storageAndIndexMap.containsValue(IoTDBIndexes.AGENT_ID_INX)) {
indexes.add(IoTDBIndexes.AGENT_ID_INX);
}
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY)) {
+ indexes.add(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY);
+ }
final IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.builder()
.model(model)
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
index ec5584e8d3..cd86b8bfdd 100644
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
@@ -18,18 +18,27 @@
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import lombok.RequiredArgsConstructor;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
@@ -173,4 +182,82 @@ public class IoTDBTraceQueryDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select *").append(" from ");
+ IoTDBUtils.addModelPath(client.getStorageGroup(), query, TraceTagAutocompleteData.INDEX_NAME);
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ Set<String> tagKeys = new HashSet<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<String> resultList = Splitter.on(IoTDBClient.DOT + "\"")
+ .splitToList(rowRecord.getFields().get(0).getStringValue());
+ String tagKey = resultList.get(resultList.size() - 1);
+ tagKeys.add(tagKey.substring(0, tagKey.length() - 1));
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+
+ return tagKeys;
+ }
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ IoTDBUtils.addModelPath(client.getStorageGroup(), query, TraceTagAutocompleteData.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY, tagKey);
+ IoTDBUtils.addQueryIndexValue(TraceTagAutocompleteData.INDEX_NAME, query, indexAndValueMap);
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ query.append(" limit ").append(limit).append(IoTDBClient.ALIGN_BY_DEVICE);
+ List<? super StorageData> storageDataList = client.filterQuery(TraceTagAutocompleteData.INDEX_NAME,
+ query.toString(),
+ new TraceTagAutocompleteData.Builder()
+ );
+
+ Set<String> tagValues = new HashSet<>();
+ storageDataList.forEach(storageData -> {
+ TraceTagAutocompleteData tagAutocompleteData = (TraceTagAutocompleteData) storageData;
+ tagValues.add(tagAutocompleteData.getTagValue());
+ });
+
+ return tagValues;
+ }
+
+ private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final StringBuilder query) {
+ long startMinTB = startSecondTB / 100;
+ long endMinTB = endSecondTB / 100;
+
+ StringBuilder where = new StringBuilder();
+ if (startMinTB > 0) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startMinTB));
+ }
+ if (endMinTB > 0) {
+ if (where.length() > 0) {
+ where.append(" and ");
+ }
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endMinTB));
+ }
+ if (where.length() > 0) {
+ query.append(" where ").append(where);
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index 4681c0aa49..80ae971464 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -27,7 +27,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
@@ -253,4 +256,71 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ List<Object> condition = new ArrayList<>(2);
+
+ sql.append("select distinct ").append(TraceTagAutocompleteData.TAG_KEY).append(" from ")
+ .append(TraceTagAutocompleteData.INDEX_NAME).append(" where ");
+ sql.append(" 1=1 ");
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, sql, condition);
+ try (Connection connection = h2Client.getConnection()) {
+ ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ Set<String> tagKeys = new HashSet<>();
+ while (resultSet.next()) {
+ tagKeys.add(resultSet.getString(TraceTagAutocompleteData.TAG_KEY));
+ }
+ return tagKeys;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ List<Object> condition = new ArrayList<>(3);
+ sql.append("select * from ").append(TraceTagAutocompleteData.INDEX_NAME).append(" where ");
+ sql.append(TraceTagAutocompleteData.TAG_KEY).append(" = ?");
+ condition.add(tagKey);
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, sql, condition);
+ sql.append(" limit ").append(limit);
+
+ try (Connection connection = h2Client.getConnection()) {
+ ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ Set<String> tagValues = new HashSet<>();
+ while (resultSet.next()) {
+ tagValues.add(resultSet.getString(TraceTagAutocompleteData.TAG_VALUE));
+ }
+ return tagValues;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void appendTagAutocompleteCondition(final long startSecondTB,
+ final long endSecondTB,
+ final StringBuilder sql,
+ final List<Object> condition) {
+ long startMinTB = startSecondTB / 100;
+ long endMinTB = endSecondTB / 100;
+ if (startMinTB > 0) {
+ sql.append(" and ");
+ sql.append(TraceTagAutocompleteData.TIME_BUCKET).append(">=?");
+ condition.add(startMinTB);
+ }
+ if (endMinTB > 0) {
+ if (!condition.isEmpty()) {
+ sql.append(" and ");
+ }
+ sql.append(TraceTagAutocompleteData.TIME_BUCKET).append("<=?");
+ condition.add(endMinTB);
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
index e236ce0770..e353490cb2 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -23,18 +23,23 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
+import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
@@ -50,10 +55,11 @@ import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
-
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.ENDPOINT_ID;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.IS_ERROR;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.LATENCY;
@@ -249,4 +255,69 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
return spanList;
}
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ BoolQueryBuilder query = Query.bool();
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ final SearchBuilder search = Search.builder().query(query);
+ search.aggregation(Aggregation.terms(TraceTagAutocompleteData.TAG_KEY)
+ .field(TraceTagAutocompleteData.TAG_KEY));
+
+ final SearchResponse response = getClient().search(
+ new TimeRangeIndexNameGenerator(TraceTagAutocompleteData.INDEX_NAME, startSecondTB, endSecondTB),
+ search.build()
+ );
+ Map<String, Object> terms =
+ (Map<String, Object>) response.getAggregations().get(TraceTagAutocompleteData.TAG_KEY);
+ List<Map<String, Object>> buckets = (List<Map<String, Object>>) terms.get("buckets");
+ Set<String> tagKeys = new HashSet<>();
+ for (Map<String, Object> bucket : buckets) {
+ String tagKey = (String) bucket.get("key");
+ if (bucket.get("key") == null) {
+ continue;
+ }
+ tagKeys.add(tagKey);
+ }
+ return tagKeys;
+ }
+
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final int limit, final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ BoolQueryBuilder query = Query.bool().must(Query.term(TraceTagAutocompleteData.TAG_KEY, tagKey));
+ appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
+ final SearchBuilder search = Search.builder().query(query).size(limit);
+
+ final SearchResponse response = getClient().search(
+ new TimeRangeIndexNameGenerator(
+ IndexController.LogicIndicesRegister.getPhysicalTableName(TraceTagAutocompleteData.INDEX_NAME),
+ startSecondTB, endSecondTB
+ ),
+ search.build()
+ );
+ Set<String> tagValues = new HashSet<>();
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ TraceTagAutocompleteData tag = new TraceTagAutocompleteData.Builder().storage2Entity(
+ new HashMapConverter.ToEntity(searchHit.getSource()));
+ tagValues.add(tag.getTagValue());
+ }
+ return tagValues;
+ }
+
+ private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final BoolQueryBuilder query) {
+ long startMinTB = startSecondTB / 100;
+ long endMinTB = endSecondTB / 100;
+ final RangeQueryBuilder rangeQuery = Query.range(TraceTagAutocompleteData.TIME_BUCKET);
+ if (startMinTB > 0) {
+ rangeQuery.gte(startMinTB);
+ }
+ if (endMinTB > 0) {
+ rangeQuery.lte(endMinTB);
+ }
+ if (startMinTB > 0 || endMinTB > 0) {
+ query.must(rangeQuery);
+ }
+ }
}
diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
index b127022f64..330dbf026f 100644
--- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
+++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.tool.profile.exporter.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
@@ -83,4 +84,20 @@ public class ProfileTraceDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return null;
}
+
+ // No need implement here
+ @Override
+ public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ return null;
+ }
+
+ // No need implement here
+ @Override
+ public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
+ final int limit,
+ final long startSecondTB,
+ final long endSecondTB) throws IOException {
+ return null;
+ }
}