You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/10 23:26:07 UTC
[skywalking] branch master updated: [IMPORTANT] Query traces with
tags as condition (#5270)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 7f7e96b [IMPORTANT] Query traces with tags as condition (#5270)
7f7e96b is described below
commit 7f7e96b088d3cb0a19f8ddeafe5ede9764ec2eda
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Aug 11 07:24:40 2020 +0800
[IMPORTANT] Query traces with tags as condition (#5270)
---
docs/en/setup/backend/configuration-vocabulary.md | 9 +-
.../parser/listener/SegmentAnalysisListener.java | 30 ++++-
.../src/main/resources/application.yml | 8 +-
.../apache/skywalking/oap/server/core/Const.java | 1 +
.../oap/server/core/CoreModuleConfig.java | 24 ++++
.../analysis/manual/segment/SegmentDispatcher.java | 2 +
.../analysis/manual/segment/SegmentRecord.java | 15 +++
.../core/analysis/manual/segment/SpanTag.java | 58 ++++++++
.../oap/server/core/config/ConfigService.java | 2 +
.../oap/server/core/query/TraceQueryService.java | 7 +-
.../core/query/input/TraceQueryCondition.java | 3 +
.../skywalking/oap/server/core/source/Segment.java | 6 +
.../server/core/storage/model/DataTypeMapping.java | 8 +-
.../oap/server/core/storage/model/ModelColumn.java | 15 +--
.../server/core/storage/model/StorageModels.java | 4 +-
.../server/core/storage/query/ITraceQueryDAO.java | 18 ++-
.../server/core/storage/model/ModelColumnTest.java | 10 +-
.../oap/query/graphql/resolver/TraceQuery.java | 2 +-
.../src/main/resources/query-protocol | 2 +-
.../server/receiver/trace/mock/ServiceAMock.java | 6 +-
.../elasticsearch/base/ColumnTypeEsMapping.java | 8 +-
.../elasticsearch/base/StorageEsInstaller.java | 4 +-
.../elasticsearch/query/TraceQueryEsDAO.java | 20 ++-
.../ElasticSearchColumnTypeMappingTestCase.java | 22 +--
.../elasticsearch7/query/TraceQueryEs7DAO.java | 19 ++-
.../storage/plugin/influxdb/TableMetaInfo.java | 5 +
.../plugin/influxdb/base/InfluxInsertRequest.java | 9 ++
.../storage/plugin/influxdb/base/RecordDAO.java | 21 +++
.../storage/plugin/influxdb/query/TraceQuery.java | 13 +-
.../elasticsearch/JaegerTraceQueryEsDAO.java | 4 +-
.../storage/plugin/jdbc/h2/H2StorageConfig.java | 29 ++++
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 47 +++++--
.../storage/plugin/jdbc/h2/dao/H2RecordDAO.java | 41 +++++-
.../storage/plugin/jdbc/h2/dao/H2SQLExecutor.java | 33 +++--
.../plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java | 149 +++++++++++++++++++++
.../storage/plugin/jdbc/h2/dao/H2StorageDAO.java | 15 ++-
.../plugin/jdbc/h2/dao/H2TableInstaller.java | 50 +++++--
.../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 54 +++++++-
.../plugin/jdbc/mysql/MySQLStorageConfig.java | 16 ++-
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 43 ++++--
.../plugin/jdbc/mysql/MySQLTableInstaller.java | 58 +++++---
.../plugin/jdbc/mysql/MySQLTraceQueryDAO.java | 8 +-
.../elasticsearch/ZipkinTraceQueryEsDAO.java | 4 +-
.../profile/exporter/test/ProfileTraceDAO.java | 4 +-
44 files changed, 772 insertions(+), 134 deletions(-)
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index c0ce056..6bb7a57 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -29,6 +29,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | serviceNameMaxLength| Max length limitation of service name.|SW_SERVICE_NAME_MAX_LENGTH|70|
| - | - | instanceNameMaxLength| Max length limitation of service instance name. The max length of service + instance names should be less than 200.|SW_INSTANCE_NAME_MAX_LENGTH|70|
| - | - | endpointNameMaxLength| Max length limitation of endpoint name. The max length of service + endpoint names should be less than 240.|SW_ENDPOINT_NAME_MAX_LENGTH|150|
+| - | - | searchableTracesTags | Define the set of span tag keys, which should be searchable through the GraphQL. Multiple values should be separated through the comma. | SW_SEARCHABLE_TAG_KEYS | http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker|
| - | - | gRPCThreadPoolSize|Pool size of gRPC server| - | CPU core * 4|
| - | - | gRPCThreadPoolQueueSize| The queue size of gRPC server| - | 10000|
| - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | - | - |
@@ -113,10 +114,14 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | url | H2 connection URL. Default is H2 memory mode | SW_STORAGE_H2_URL | jdbc:h2:mem:skywalking-oap-db |
| - | - | user | User name of H2 database. | SW_STORAGE_H2_USER | sa |
| - | - | password | Password of H2 database. | - | - |
-| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 |
+| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
+| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the H2, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
+| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist, please copy it into oap-lib folder manually | - | - |
| - | - | properties | Hikari connection pool configurations | - | Listed in the `application.yaml`. |
-| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 |
+| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
+| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the MySQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
+| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |influxdb| - | InfluxDB storage. |- | - |
| - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
| - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
index b0f3461..bb88786 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
@@ -18,22 +18,26 @@
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
+import java.util.Arrays;
+import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.Segment;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions.
@@ -44,6 +48,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private final SourceReceiver sourceReceiver;
private final TraceSegmentSampler sampler;
private final NamingControl namingControl;
+ private final List<String> searchableTagKeys;
private final Segment segment = new Segment();
private SAMPLE_STATUS sampleStatus = SAMPLE_STATUS.UNKNOWN;
@@ -96,7 +101,6 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
serviceId,
endpointName
);
-
}
@Override
@@ -144,11 +148,21 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
if (!isError && span.getIsError()) {
isError = true;
}
+
+ appendSearchableTags(span);
});
final long accurateDuration = endTimestamp - startTimestamp;
duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration;
}
+ private void appendSearchableTags(SpanObject span) {
+ span.getTagsList().forEach(tag -> {
+ if (searchableTagKeys.contains(tag.getKey())) {
+ segment.getTags().add(new SpanTag(tag.getKey(), tag.getValue()));
+ }
+ });
+ }
+
@Override
public void build() {
if (log.isDebugEnabled()) {
@@ -173,9 +187,14 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private final SourceReceiver sourceReceiver;
private final TraceSegmentSampler sampler;
private final NamingControl namingControl;
+ private final List<String> searchTagKeys;
public Factory(ModuleManager moduleManager, AnalyzerModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ final ConfigService configService = moduleManager.find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+ this.searchTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
this.sampler = new TraceSegmentSampler(config.getTraceSampleRateWatcher());
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
@@ -184,7 +203,12 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
@Override
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
- return new SegmentAnalysisListener(sourceReceiver, sampler, namingControl);
+ return new SegmentAnalysisListener(
+ sourceReceiver,
+ sampler,
+ namingControl,
+ searchTagKeys
+ );
}
}
}
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index b6c585a..13a4f17 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -89,6 +89,8 @@ core:
instanceNameMaxLength: ${SW_INSTANCE_NAME_MAX_LENGTH:70}
# The max length of service + endpoint names should be less than 240
endpointNameMaxLength: ${SW_ENDPOINT_NAME_MAX_LENGTH:150}
+ # Define the set of span tag keys, which should be searchable through the GraphQL.
+ searchableTagKeys: ${SW_SEARCHABLE_TAG_KEYS:http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker}
storage:
selector: ${SW_STORAGE:h2}
elasticsearch:
@@ -140,6 +142,8 @@ storage:
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
user: ${SW_STORAGE_H2_USER:sa}
metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
+ maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
+ numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
mysql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
@@ -150,6 +154,8 @@ storage:
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
+ maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
+ numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
influxdb:
# InfluxDB configuration
url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
@@ -229,7 +235,7 @@ kafka-fetcher:
receiver-meter:
selector: ${SW_RECEIVER_METER:-}
default:
-
+
receiver-oc:
selector: ${SW_OC_RECEIVER:-}
default:
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index e67871b..3592bc8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -27,6 +27,7 @@ public class Const {
public static final String RELATION_ID_CONNECTOR = "-";
public static final String RELATION_ID_PARSER_SPLIT = "\\-";
public static final String LINE = "-";
+ public static final String COMMA = ",";
public static final String SPACE = " ";
public static final String KEY_VALUE_SPLIT = ",";
public static final String ARRAY_SPLIT = "|";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 4682d40..e36ce7c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
+import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.ScopeDefaultColumn;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@@ -116,6 +117,14 @@ public class CoreModuleConfig extends ModuleConfig {
* In the current practice, we don't recommend the length over 190.
*/
private int endpointNameMaxLength = 150;
+ /**
+ * Define the set of span tag keys, which should be searchable through the GraphQL.
+ *
+ * @since 8.2.0
+ */
+ @Setter
+ @Getter
+ private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS;
public CoreModuleConfig() {
this.downsampling = new ArrayList<>();
@@ -142,4 +151,19 @@ public class CoreModuleConfig extends ModuleConfig {
*/
Aggregator
}
+
+ /**
+ * SkyWalking Java Agent provides the recommended tag keys for other language agents or SDKs. This field declare the
+ * recommended keys should be searchable.
+ */
+ private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
+ Const.COMMA,
+ "http.method",
+ "status_code",
+ "db.type",
+ "db.instance",
+ "mq.queue",
+ "mq.topic",
+ "mq.broker"
+ );
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
index 69f8fab..69036ba 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
@@ -40,6 +40,8 @@ public class SegmentDispatcher implements SourceDispatcher<Segment> {
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
+ segment.setTagsRawData(source.getTags());
+ segment.setTags(SpanTag.Util.toStringList(source.getTags()));
RecordStreamProcessor.getInstance().in(segment);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index a8d523c..bf7369e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.Base64;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import joptsimple.internal.Strings;
import lombok.Getter;
@@ -53,6 +54,7 @@ public class SegmentRecord extends Record {
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String VERSION = "version";
+ public static final String TAGS = "tags";
@Setter
@Getter
@@ -106,6 +108,17 @@ public class SegmentRecord extends Record {
@Getter
@Column(columnName = VERSION, storageOnly = true)
private int version;
+ @Setter
+ @Getter
+ @Column(columnName = TAGS)
+ private List<String> tags;
+ /**
+ * Tags raw data is a duplicate field of {@link #tags}. Some storage don't support array values in a single column.
+ * Then, those implementations could use this raw data to generate necessary data structures.
+ */
+ @Setter
+ @Getter
+ private List<SpanTag> tagsRawData;
@Override
public String id() {
@@ -139,6 +152,7 @@ public class SegmentRecord extends Record {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(VERSION, storageData.getVersion());
+ map.put(TAGS, storageData.getTags());
return map;
}
@@ -163,6 +177,7 @@ public class SegmentRecord extends Record {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setVersion(((Number) dbMap.get(VERSION)).intValue());
+ // Don't read the tags as they has been in the data binary already.
return record;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java
new file mode 100644
index 0000000..4dc2514
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+@Getter
+@Setter
+public class SpanTag {
+ private String key;
+ private String value;
+
+ public SpanTag() {
+ }
+
+ public SpanTag(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return key + "=" + value;
+ }
+
+ public static class Util {
+ public static List<String> toStringList(List<SpanTag> list) {
+ if (CollectionUtils.isEmpty(list)) {
+ return Collections.emptyList();
+ }
+ List<String> result = new ArrayList<>(list.size());
+ list.forEach(e -> result.add(e.toString()));
+ return result;
+ }
+ }
+}
+
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
index 923d101..a5fd428 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
@@ -26,9 +26,11 @@ import org.apache.skywalking.oap.server.library.module.Service;
public class ConfigService implements Service {
private final String gRPCHost;
private final int gRPCPort;
+ private final String searchableTracesTags;
public ConfigService(CoreModuleConfig moduleConfig) {
this.gRPCHost = moduleConfig.getGRPCHost();
this.gRPCPort = moduleConfig.getGRPCPort();
+ this.searchableTracesTags = moduleConfig.getSearchableTracesTags();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index 4fc0c87..1b73ffd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
@@ -83,13 +84,13 @@ public class TraceQueryService implements Service {
final QueryOrder queryOrder,
final Pagination paging,
final long startTB,
- final long endTB) throws IOException {
+ final long endTB,
+ final List<SpanTag> tags) throws IOException {
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);
return getTraceQueryDAO().queryBasicTraces(
startTB, endTB, minTraceDuration, maxTraceDuration, endpointName, serviceId, serviceInstanceId, endpointId,
- traceId, page
- .getLimit(), page.getFrom(), traceState, queryOrder
+ traceId, page.getLimit(), page.getFrom(), traceState, queryOrder, tags
);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java
index 86cfb14..2b6845f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java
@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.core.query.input;
+import java.util.List;
import lombok.Getter;
import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
@@ -38,4 +40,5 @@ public class TraceQueryCondition {
private TraceState traceState;
private QueryOrder queryOrder;
private Pagination paging;
+ private List<SpanTag> tags;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
index 2bdec9a..f7b58f5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.core.source;
+import java.util.ArrayList;
+import java.util.List;
import lombok.Getter;
import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SEGMENT;
@@ -72,4 +75,7 @@ public class Segment extends Source {
@Setter
@Getter
private int version;
+ @Setter
+ @Getter
+ private List<SpanTag> tags = new ArrayList<>();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java
index 4ad3622..1f679de 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java
@@ -18,7 +18,11 @@
package org.apache.skywalking.oap.server.core.storage.model;
-public interface DataTypeMapping {
+import java.lang.reflect.Type;
- String transform(Class<?> type);
+public interface DataTypeMapping {
+ /**
+ * Map the given typd and genericType of the field to the column type.
+ */
+ String transform(Class<?> type, Type genericType);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java
index 7f8c9a0..0d0c2bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.model;
-import com.google.gson.JsonObject;
+import java.lang.reflect.Type;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
@@ -26,28 +26,23 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
public class ModelColumn {
private final ColumnName columnName;
private final Class<?> type;
+ private final Type genericType;
private final boolean matchQuery;
private final boolean storageOnly;
private final int length;
public ModelColumn(ColumnName columnName,
Class<?> type,
+ Type genericType,
boolean matchQuery,
boolean storageOnly,
boolean isValue,
int length) {
this.columnName = columnName;
this.type = type;
+ this.genericType = genericType;
this.matchQuery = matchQuery;
-
- /*
- * Only accept length in the String/JsonObject definition.
- */
- if (type.equals(String.class) || type.equals(JsonObject.class)) {
- this.length = length;
- } else {
- this.length = 0;
- }
+ this.length = length;
/*
* byte[] and {@link IntKeyLongValueHashMap} could never be query.
*/
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 6000a93..600dbe9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -122,8 +122,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
}
modelColumns.add(
new ModelColumn(
- new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(),
- column.storageOnly(), column.dataType().isValue(), columnLength
+ new ColumnName(modelName, column.columnName()), field.getType(), field.getGenericType(),
+ column.matchQuery(), column.storageOnly(), column.dataType().isValue(), columnLength
));
if (log.isDebugEnabled()) {
log.debug("The field named {} with the {} type", column.columnName(), field.getType());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index 3660034..35e48cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
@@ -29,9 +30,20 @@ import org.apache.skywalking.oap.server.library.module.Service;
public interface ITraceQueryDAO extends Service {
- TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
- String endpointName, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from,
- TraceState traceState, QueryOrder queryOrder) throws IOException;
+ TraceBrief queryBasicTraces(long startSecondTB,
+ long endSecondTB,
+ long minDuration,
+ long maxDuration,
+ String endpointName,
+ String serviceId,
+ String serviceInstanceId,
+ String endpointId,
+ String traceId,
+ int limit,
+ int from,
+ TraceState traceState,
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index 7c2d0a2..670eac2 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -25,20 +25,20 @@ import org.junit.Test;
public class ModelColumnTest {
@Test
public void testColumnDefine() {
- ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, true,
+ ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, byte[].class, true,
false, true, 0
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
- column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, true,
+ column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class, true,
false, true, 200
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
- Assert.assertEquals(0, column.getLength());
+ Assert.assertEquals(200, column.getLength());
- column = new ModelColumn(new ColumnName("", "abc"), String.class, true,
+ column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class, true,
false, true, 200
);
Assert.assertEquals(false, column.isStorageOnly());
@@ -47,7 +47,7 @@ public class ModelColumnTest {
@Test(expected = IllegalArgumentException.class)
public void testConflictDefinition() {
- ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class,
+ ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
true, true, true, 200
);
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
index 804334e..d9df534 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java
@@ -75,7 +75,7 @@ public class TraceQuery implements GraphQLQueryResolver {
return getQueryService().queryBasicTraces(
condition.getServiceId(), condition.getServiceInstanceId(), endpointId, traceId, endpointName, minDuration,
- maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB
+ maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB, condition.getTags()
);
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 563bb51..f38def1 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 563bb51c71922f017911345d7cd5c62a7ac8995c
+Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
index f72ec1b..6192eb5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.mock;
import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
@@ -29,7 +30,7 @@ class ServiceAMock {
public static String SERVICE_NAME = "mock_a_service";
public static String SERVICE_INSTANCE_NAME = "mock_a_service_instance";
- static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest";
+ static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest/404-test";
static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()";
static String DUBBO_ADDRESS = "DubboIPAddress:1000";
@@ -62,6 +63,9 @@ class ServiceAMock {
span.setComponentId(ComponentsDefine.TOMCAT.getId());
span.setOperationName(REST_ENDPOINT);
span.setIsError(false);
+ span.addTags(KeyStringValuePair.newBuilder().setKey("http.method").setValue("get").build());
+ span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("404").build());
+ span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("200").build());
return span;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 7cec27e..6acc3be 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -19,6 +19,9 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.JsonObject;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
@@ -26,7 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObje
public class ColumnTypeEsMapping implements DataTypeMapping {
@Override
- public String transform(Class<?> type) {
+ public String transform(Class<?> type, Type genericType) {
if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
return "integer";
} else if (Long.class.equals(type) || long.class.equals(type)) {
@@ -41,6 +44,9 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "binary";
} else if (JsonObject.class.equals(type)) {
return "text";
+ } else if (List.class.isAssignableFrom(type)) {
+ final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
+ return transform((Class<?>) elementType, elementType);
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 143d095..9dc331b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -133,7 +133,7 @@ public class StorageEsInstaller extends ModelInstaller {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
Map<String, Object> originalColumn = new HashMap<>();
- originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
+ originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType()));
originalColumn.put("copy_to", matchCName);
properties.put(columnDefine.getColumnName().getName(), originalColumn);
@@ -143,7 +143,7 @@ public class StorageEsInstaller extends ModelInstaller {
properties.put(matchCName, matchColumn);
} else {
Map<String, Object> column = new HashMap<>();
- column.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
+ column.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType()));
if (columnDefine.isStorageOnly()) {
column.put("index", false);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index 192be91..047be7a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -67,7 +69,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
@@ -120,6 +123,13 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
+ if (CollectionUtils.isNotEmpty(tags)) {
+ BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
+ tags.forEach(tag -> {
+ tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString()));
+ });
+ mustQueryList.add(tagMatchQuery);
+ }
sourceBuilder.size(limit);
sourceBuilder.from(from);
@@ -135,9 +145,11 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
- basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap()
- .get(
- SegmentRecord.IS_ERROR)).intValue()));
+ basicTrace.setError(
+ BooleanUtils.valueToBoolean(
+ ((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()
+ )
+ );
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
index 807a7b8..ae1ecc7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -18,24 +18,30 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import java.lang.reflect.Type;
+import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class ElasticSearchColumnTypeMappingTestCase {
+ public List<String> a;
@Test
- public void test() {
+ public void test() throws NoSuchFieldException {
ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
- Assert.assertEquals("integer", mapping.transform(int.class));
- Assert.assertEquals("integer", mapping.transform(Integer.class));
+ Assert.assertEquals("integer", mapping.transform(int.class, int.class));
+ Assert.assertEquals("integer", mapping.transform(Integer.class, Integer.class));
- Assert.assertEquals("long", mapping.transform(long.class));
- Assert.assertEquals("long", mapping.transform(Long.class));
+ Assert.assertEquals("long", mapping.transform(long.class, long.class));
+ Assert.assertEquals("long", mapping.transform(Long.class, Long.class));
- Assert.assertEquals("double", mapping.transform(double.class));
- Assert.assertEquals("double", mapping.transform(Double.class));
+ Assert.assertEquals("double", mapping.transform(double.class, double.class));
+ Assert.assertEquals("double", mapping.transform(Double.class, Double.class));
- Assert.assertEquals("keyword", mapping.transform(String.class));
+ Assert.assertEquals("keyword", mapping.transform(String.class, String.class));
+
+ final Type listFieldType = this.getClass().getField("a").getGenericType();
+ Assert.assertEquals("keyword", mapping.transform(List.class, listFieldType));
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java
index c78628d..2077fa9 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java
@@ -23,12 +23,14 @@ import java.io.IOException;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
@@ -59,7 +61,8 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
@@ -112,6 +115,13 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
+ if (CollectionUtils.isNotEmpty(tags)) {
+ BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
+ tags.forEach(tag -> {
+ tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString()));
+ });
+ mustQueryList.add(tagMatchQuery);
+ }
sourceBuilder.size(limit);
sourceBuilder.from(from);
@@ -127,9 +137,10 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
- basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap()
- .get(
- SegmentRecord.IS_ERROR)).intValue()));
+ basicTrace.setError(
+ BooleanUtils.valueToBoolean(
+ ((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue())
+ );
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
index 3d73663..dfd4476 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
@@ -80,6 +80,11 @@ public class TableMetaInfo {
if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
}
+
+ // The field of SegmentRecord, tags, store as tag only. see SegmentRecord.DATA_BINARY
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ storageAndColumnMap.remove(SegmentRecord.TAGS);
+ }
}
TableMetaInfo info = TableMetaInfo.builder()
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
index f984372..c510d2e 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -40,6 +41,9 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder storageBuilder) {
Map<String, Object> objectMap = storageBuilder.data2Map(storageData);
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ objectMap.remove(SegmentRecord.TAGS);
+ }
for (ModelColumn column : model.getColumns()) {
Object value = objectMap.get(column.getColumnName().getName());
@@ -68,6 +72,11 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
return this;
}
+ public InfluxInsertRequest tag(String key, String value) {
+ builder.tag(key, value);
+ return this;
+ }
+
public Point getPoint() {
return builder.build();
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
index 63739c1..a3d1bc7 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
@@ -18,10 +18,16 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
+import com.google.common.base.Joiner;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@@ -52,6 +58,21 @@ public class RecordDAO implements IRecordDAO {
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag);
});
+
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ Map<String, List<SpanTag>> collect = ((SegmentRecord) record).getTagsRawData()
+ .stream()
+ .collect(
+ Collectors.groupingBy(SpanTag::getKey));
+ collect.entrySet().forEach(e -> {
+ request.tag(e.getKey(), "'" + Joiner.on("'")
+ .join(e.getValue()
+ .stream()
+ .map(SpanTag::getValue)
+ .collect(Collectors.toSet())) + "'");
+ });
+ }
return request;
}
+
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
index f315082..f01602b 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
@@ -26,6 +26,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
@@ -33,12 +34,14 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
@@ -69,7 +72,8 @@ public class TraceQuery implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder)
+ QueryOrder queryOrder,
+ final List<SpanTag> tags)
throws IOException {
String orderBy = SegmentRecord.START_TIME;
@@ -121,6 +125,13 @@ public class TraceQuery implements ITraceQueryDAO {
recallQuery.and(eq(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
break;
}
+ if (CollectionUtils.isNotEmpty(tags)) {
+ WhereNested<WhereQueryImpl<SelectQueryImpl>> nested = recallQuery.andNested();
+ for (final SpanTag tag : tags) {
+ nested.and(contains(tag.getKey(), "'" + tag.getValue() + "'"));
+ }
+ nested.close();
+ }
WhereQueryImpl<SelectQueryImpl> countQuery = select()
.count(SegmentRecord.ENDPOINT_ID)
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
index 6bcd4af..4e8c48f 100644
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
@@ -89,7 +90,8 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
index b39c2f2..75667e6 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
@@ -30,4 +30,33 @@ public class H2StorageConfig extends ModuleConfig {
private String user = "";
private String password = "";
private int metadataQueryMaxSize = 5000;
+ /**
+ * Some entities, such as trace segment, include the logic column with multiple values. Some storage support this
+ * kind of data structure, but H2 doesn't.
+ *
+ * In the H2, we use multiple physical columns to host the values, such as,
+ *
+ * Change column_a with values [1,2,3,4,5] to
+ * <p>
+ * column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5
+ * </p>
+ *
+ * This configuration controls the threshold about how many physical columns should to be added, also limit the max
+ * values of this kind of column.
+ *
+ * SkyWalking don't create a new table for indexing, because it would amplify the size of data set to dozens time,
+ * which is not practical in the production environment.
+ *
+ * @since 8.2.0
+ */
+ private int maxSizeOfArrayColumn = 20;
+ /**
+ * In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such
+ * as multiple HTTP exit spans all have their own `http.method` tag.
+ *
+ * This configuration set the limitation of max num of values for the same tag key.
+ *
+ * @since 8.2.0
+ */
+ private int numOfSearchableValuesPerTag = 2;
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 2653522..82db553 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -20,7 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
@@ -109,39 +111,64 @@ public class H2StorageProvider extends ModuleProvider {
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
- this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client));
+ this.registerServiceImplementation(
+ StorageDAO.class,
+ new H2StorageDAO(
+ getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
+ );
this.registerServiceImplementation(
- INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
+ INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
- this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
this.registerServiceImplementation(
- IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
+ ITraceQueryDAO.class, new H2TraceQueryDAO(
+ getManager(),
+ h2Client,
+ config.getMaxSizeOfArrayColumn(),
+ config.getNumOfSearchableValuesPerTag()
+ ));
+ this.registerServiceImplementation(
+ IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(
- IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
+ IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
this.registerServiceImplementation(
- IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
+ IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
- HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ final ConfigService configService = getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+ final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
+ if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
+ throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags
+ + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ + "]. Potential out of bound in the runtime.");
+ }
+
+ MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
+ "storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
h2Client.registerChecker(healthChecker);
try {
h2Client.connect();
- H2TableInstaller installer = new H2TableInstaller(h2Client, getManager());
+ H2TableInstaller installer = new H2TableInstaller(
+ h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
@@ -155,6 +182,6 @@ public class H2StorageProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
- return new String[]{CoreModule.NAME};
+ return new String[] {CoreModule.NAME};
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
index c0c08eb..2cce60f 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
@@ -19,25 +19,58 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
-
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
+ private final int maxSizeOfArrayColumn;
- public H2RecordDAO(JDBCHikariCPClient h2Client, StorageBuilder<Record> storageBuilder) {
+ public H2RecordDAO(ModuleManager manager,
+ JDBCHikariCPClient h2Client,
+ StorageBuilder<Record> storageBuilder,
+ final int maxSizeOfArrayColumn,
+ final int numOfSearchableValuesPerTag) {
this.h2Client = h2Client;
- this.storageBuilder = storageBuilder;
+ try {
+ if (SegmentRecord.class
+ .equals(
+ storageBuilder.getClass().getMethod("map2Data", Map.class).getReturnType()
+ )
+ ) {
+ this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
+ final ConfigService configService = manager.find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+ this.storageBuilder = new H2SegmentRecordBuilder(
+ maxSizeOfArrayColumn,
+ numOfSearchableValuesPerTag,
+ Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA))
+ );
+ } else {
+ this.maxSizeOfArrayColumn = 1;
+ this.storageBuilder = storageBuilder;
+ }
+ } catch (NoSuchMethodException e) {
+ throw new UnexpectedException("Can't find the SegmentRecord$Builder.map2Data method.");
+ }
}
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- return getInsertExecutor(model.getName(), record, storageBuilder);
+ return getInsertExecutor(model.getName(), record, storageBuilder, maxSizeOfArrayColumn);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
index 04c79ca..b63e910 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
@@ -105,6 +105,12 @@ public class H2SQLExecutor {
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder) throws IOException {
+ return getInsertExecutor(modelName, metrics, storageBuilder, 1);
+ }
+
+ protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
+ StorageBuilder<T> storageBuilder,
+ int maxSizeOfArrayColumn) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
@@ -114,16 +120,27 @@ public class H2SQLExecutor {
param.add(metrics.id());
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
- sqlBuilder.append("?");
- if (i != columns.size() - 1) {
- sqlBuilder.append(",");
+ if (List.class.isAssignableFrom(column.getType())) {
+ for (int physicalColumnIdx = 0; physicalColumnIdx < maxSizeOfArrayColumn; physicalColumnIdx++) {
+ sqlBuilder.append("?");
+ param.add(objectMap.get(column.getColumnName().getName() + "_" + physicalColumnIdx));
+ if (physicalColumnIdx != maxSizeOfArrayColumn - 1) {
+ sqlBuilder.append(",");
+ }
+ }
+ } else {
+ sqlBuilder.append("?");
+
+ Object value = objectMap.get(column.getColumnName().getName());
+ if (value instanceof StorageDataComplexObject) {
+ param.add(((StorageDataComplexObject) value).toStorageData());
+ } else {
+ param.add(value);
+ }
}
- Object value = objectMap.get(column.getColumnName().getName());
- if (value instanceof StorageDataComplexObject) {
- param.add(((StorageDataComplexObject) value).toStorageData());
- } else {
- param.add(value);
+ if (i != columns.size() - 1) {
+ sqlBuilder.append(",");
}
}
sqlBuilder.append(")");
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java
new file mode 100644
index 0000000..522178a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import joptsimple.internal.Strings;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.DATA_BINARY;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_ID;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_NAME;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.END_TIME;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.IS_ERROR;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.LATENCY;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SEGMENT_ID;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_ID;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_INSTANCE_ID;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.START_TIME;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TAGS;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TIME_BUCKET;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TRACE_ID;
+import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.VERSION;
+
+/**
+ * H2/MySQL is different from standard {@link org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.Builder},
+ * this maps the tags into multiple columns.
+ */
+public class H2SegmentRecordBuilder implements StorageBuilder<Record> {
+ private int numOfSearchableValuesPerTag;
+ private final List<String> searchTagKeys;
+
+ public H2SegmentRecordBuilder(final int maxSizeOfArrayColumn,
+ final int numOfSearchableValuesPerTag,
+ final List<String> searchTagKeys) {
+ this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
+ final int maxNumOfTags = maxSizeOfArrayColumn / numOfSearchableValuesPerTag;
+ if (searchTagKeys.size() > maxNumOfTags) {
+ this.searchTagKeys = searchTagKeys.subList(0, maxNumOfTags);
+ } else {
+ this.searchTagKeys = searchTagKeys;
+ }
+ }
+
+ @Override
+ public Map<String, Object> data2Map(Record record) {
+ SegmentRecord storageData = (SegmentRecord) record;
+ storageData.setStatement(Strings.join(new String[] {
+ storageData.getEndpointName(),
+ storageData.getTraceId()
+ }, " - "));
+ Map<String, Object> map = new HashMap<>();
+ map.put(SEGMENT_ID, storageData.getSegmentId());
+ map.put(TRACE_ID, storageData.getTraceId());
+ map.put(TopN.STATEMENT, storageData.getStatement());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ map.put(ENDPOINT_NAME, storageData.getEndpointName());
+ map.put(ENDPOINT_ID, storageData.getEndpointId());
+ map.put(START_TIME, storageData.getStartTime());
+ map.put(END_TIME, storageData.getEndTime());
+ map.put(LATENCY, storageData.getLatency());
+ map.put(IS_ERROR, storageData.getIsError());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
+ map.put(DATA_BINARY, Const.EMPTY_STRING);
+ } else {
+ map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
+ }
+ map.put(VERSION, storageData.getVersion());
+ storageData.getTagsRawData().forEach(spanTag -> {
+ final int index = searchTagKeys.indexOf(spanTag.getKey());
+ boolean shouldAdd = true;
+ int tagIdx = 0;
+ final String tagExpression = spanTag.toString();
+ for (int i = 0; i < numOfSearchableValuesPerTag; i++) {
+ tagIdx = index * numOfSearchableValuesPerTag + i;
+ final String previousValue = (String) map.get(TAGS + "_" + tagIdx);
+ if (previousValue == null) {
+ // Still have at least one available slot, add directly.
+ shouldAdd = true;
+ break;
+ }
+ // If value is duplicated with added one, ignore.
+ if (previousValue.equals(tagExpression)) {
+ shouldAdd = false;
+ break;
+ }
+ // Reach the end of tag
+ if (i == numOfSearchableValuesPerTag - 1) {
+ shouldAdd = false;
+ }
+ }
+ if (shouldAdd) {
+ map.put(TAGS + "_" + tagIdx, tagExpression);
+ }
+ });
+ return map;
+ }
+
+ @Override
+ public Record map2Data(Map<String, Object> dbMap) {
+ SegmentRecord record = new SegmentRecord();
+ record.setSegmentId((String) dbMap.get(SEGMENT_ID));
+ record.setTraceId((String) dbMap.get(TRACE_ID));
+ record.setStatement((String) dbMap.get(TopN.STATEMENT));
+ record.setServiceId((String) dbMap.get(SERVICE_ID));
+ record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID));
+ record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
+ record.setEndpointId((String) dbMap.get(ENDPOINT_ID));
+ record.setStartTime(((Number) dbMap.get(START_TIME)).longValue());
+ record.setEndTime(((Number) dbMap.get(END_TIME)).longValue());
+ record.setLatency(((Number) dbMap.get(LATENCY)).intValue());
+ record.setIsError(((Number) dbMap.get(IS_ERROR)).intValue());
+ record.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ if (StringUtil.isEmpty((String) dbMap.get(DATA_BINARY))) {
+ record.setDataBinary(new byte[] {});
+ } else {
+ record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
+ }
+ record.setVersion(((Number) dbMap.get(VERSION)).intValue());
+ // Don't read the tags as they has been in the data binary already.
+ return record;
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
index 0b2bdf1..f81d03f 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -29,14 +30,14 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+@RequiredArgsConstructor
public class H2StorageDAO implements StorageDAO {
-
- private JDBCHikariCPClient h2Client;
-
- public H2StorageDAO(JDBCHikariCPClient h2Client) {
- this.h2Client = h2Client;
- }
+ private final ModuleManager manager;
+ private final JDBCHikariCPClient h2Client;
+ private final int maxSizeOfArrayColumn;
+ private final int numOfSearchableValuesPerTag;
@Override
public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
@@ -45,7 +46,7 @@ public class H2StorageDAO implements StorageDAO {
@Override
public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
- return new H2RecordDAO(h2Client, storageBuilder);
+ return new H2RecordDAO(manager, h2Client, storageBuilder, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
index ebbd958..536b618 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
@@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import com.google.gson.JsonObject;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -44,8 +47,16 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
public class H2TableInstaller extends ModelInstaller {
public static final String ID_COLUMN = "id";
- public H2TableInstaller(Client client, ModuleManager moduleManager) {
+ protected final int maxSizeOfArrayColumn;
+ protected final int numOfSearchableValuesPerTag;
+
+ public H2TableInstaller(Client client,
+ ModuleManager moduleManager,
+ int maxSizeOfArrayColumn,
+ int numOfSearchableValuesPerTag) {
super(client, moduleManager);
+ this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
+ this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
}
@Override
@@ -67,9 +78,7 @@ public class H2TableInstaller extends ModelInstaller {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(
- name.getStorageName() + " " + getColumnType(column) + (i != model
- .getColumns()
- .size() - 1 ? "," : ""));
+ getColumn(column) + (i != model.getColumns().size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
@@ -88,22 +97,37 @@ public class H2TableInstaller extends ModelInstaller {
/**
* Set up the data type mapping between Java type and H2 database type
*/
- protected String getColumnType(ModelColumn column) {
- final Class<?> type = column.getType();
+ protected String getColumn(ModelColumn column) {
+ return transform(column, column.getType(), column.getGenericType());
+ }
+
+ protected String transform(ModelColumn column, Class<?> type, Type genericType) {
+ final String storageName = column.getColumnName().getStorageName();
if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
- return "INT";
+ return storageName + " INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
- return "BIGINT";
+ return storageName + " BIGINT";
} else if (Double.class.equals(type) || double.class.equals(type)) {
- return "DOUBLE";
+ return storageName + " DOUBLE";
} else if (String.class.equals(type)) {
- return "VARCHAR(" + column.getLength() + ")";
+ return storageName + " VARCHAR(" + column.getLength() + ")";
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
- return "VARCHAR(20000)";
+ return storageName + " VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
- return "MEDIUMTEXT";
+ return storageName + " MEDIUMTEXT";
} else if (JsonObject.class.equals(type)) {
- return "VARCHAR(" + column.getLength() + ")";
+ return storageName + " VARCHAR(" + column.getLength() + ")";
+ } else if (List.class.isAssignableFrom(type)) {
+ final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
+ String oneColumnType = transform(column, (Class<?>) elementType, elementType);
+ // Remove the storageName as prefix
+ oneColumnType = oneColumnType.substring(storageName.length());
+ StringBuilder columns = new StringBuilder();
+ for (int i = 0; i < maxSizeOfArrayColumn; i++) {
+ columns.append(storageName).append("_").append(i).append(oneColumnType)
+ .append(i == maxSizeOfArrayColumn - 1 ? "" : ",");
+ }
+ return columns.toString();
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index d3ee419..35e9828 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -24,11 +24,16 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
@@ -36,14 +41,26 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.search.sort.SortOrder;
public class H2TraceQueryDAO implements ITraceQueryDAO {
+ private ModuleManager manager;
private JDBCHikariCPClient h2Client;
-
- public H2TraceQueryDAO(JDBCHikariCPClient h2Client) {
+ private List<String> searchableTagKeys;
+ private int maxSizeOfArrayColumn;
+ private int numOfSearchableValuesPerTag;
+
+ public H2TraceQueryDAO(ModuleManager manager,
+ JDBCHikariCPClient h2Client,
+ final int maxSizeOfArrayColumn,
+ final int numOfSearchableValuesPerTag) {
this.h2Client = h2Client;
+ this.manager = manager;
+ this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
+ this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
}
@Override
@@ -59,7 +76,18 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
+ if (searchableTagKeys == null) {
+ final ConfigService configService = manager.find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+ searchableTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
+ if (searchableTagKeys.size() > maxSizeOfArrayColumn) {
+ this.searchableTagKeys = searchableTagKeys.subList(0, maxSizeOfArrayColumn);
+ }
+ }
+
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
@@ -101,6 +129,26 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?");
parameters.add(traceId);
}
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final SpanTag tag : tags) {
+ final int foundIdx = searchableTagKeys.indexOf(tag.getKey());
+ if (foundIdx > -1) {
+ sql.append(" and (");
+ for (int i = 0; i < numOfSearchableValuesPerTag; i++) {
+ final String physicalColumn = SegmentRecord.TAGS + "_" + (foundIdx * numOfSearchableValuesPerTag + i);
+ sql.append(physicalColumn).append(" = ? ");
+ parameters.add(tag.toString());
+ if (i != numOfSearchableValuesPerTag - 1) {
+ sql.append(" or ");
+ }
+ }
+ sql.append(")");
+ } else {
+ //If the tag is not searchable, but is required, then don't need to run the real query.
+ return new TraceBrief();
+ }
+ }
+ }
switch (traceState) {
case ERROR:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
index 1b44b1d..eb9b1ae 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
@@ -18,16 +18,26 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
+import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import java.util.Properties;
-
@Setter
@Getter
public final class MySQLStorageConfig extends ModuleConfig {
-
private int metadataQueryMaxSize = 5000;
+ /**
+ * Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getMaxSizeOfArrayColumn()}
+ *
+ * @since 8.2.0
+ */
+ private int maxSizeOfArrayColumn = 20;
+ /**
+ * Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getNumOfSearchableValuesPerTag()}
+ *
+ * @since 8.2.0
+ */
+ private int numOfSearchableValuesPerTag = 2;
private Properties properties;
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 11a39bc..765c252 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -19,7 +19,9 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
@@ -96,35 +98,60 @@ public class MySQLStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
- this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(mysqlClient));
this.registerServiceImplementation(
- INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient));
+ StorageDAO.class,
+ new H2StorageDAO(
+ getManager(), mysqlClient, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
+ );
+ this.registerServiceImplementation(
+ INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(mysqlClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(mysqlClient));
- this.registerServiceImplementation(ITraceQueryDAO.class, new MySQLTraceQueryDAO(mysqlClient));
this.registerServiceImplementation(
- IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
+ ITraceQueryDAO.class,
+ new MySQLTraceQueryDAO(
+ getManager(),
+ mysqlClient,
+ config.getMaxSizeOfArrayColumn(),
+ config.getNumOfSearchableValuesPerTag()
+ )
+ );
+ this.registerServiceImplementation(
+ IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(
- IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
+ IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(mysqlClient));
this.registerServiceImplementation(
- IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient));
+ IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(mysqlClient));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
+ final ConfigService configService = getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+ final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
+ if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
+ throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags
+ + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ + "]. Potential out of bound in the runtime.");
+ }
+
try {
mysqlClient.connect();
- MySQLTableInstaller installer = new MySQLTableInstaller(mysqlClient, getManager());
+ MySQLTableInstaller installer = new MySQLTableInstaller(
+ mysqlClient, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()
+ );
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
@@ -138,6 +165,6 @@ public class MySQLStorageProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
- return new String[]{CoreModule.NAME};
+ return new String[] {CoreModule.NAME};
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 55a6b51..62f615f 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
@@ -40,8 +41,11 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal
*/
@Slf4j
public class MySQLTableInstaller extends H2TableInstaller {
- public MySQLTableInstaller(Client client, ModuleManager moduleManager) {
- super(client, moduleManager);
+ public MySQLTableInstaller(Client client,
+ ModuleManager moduleManager,
+ int maxSizeOfArrayColumn,
+ int numOfSearchableValuesPerTag) {
+ super(client, moduleManager, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
/*
* Override column because the default column names in core have syntax conflict with MySQL.
*/
@@ -72,24 +76,39 @@ public class MySQLTableInstaller extends H2TableInstaller {
int indexSeq = 0;
for (final ModelColumn modelColumn : model.getColumns()) {
if (!modelColumn.isStorageOnly()) {
- SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
- tableIndexSQL.append(model.getName().toUpperCase())
- .append("_")
- .append(String.valueOf(indexSeq++))
- .append("_IDX ");
- tableIndexSQL.append("ON ").append(model.getName()).append("(")
- .append(modelColumn.getColumnName().getStorageName())
- .append(")");
- createIndex(client, connection, model, tableIndexSQL);
+ final Class<?> type = modelColumn.getType();
+ if (List.class.isAssignableFrom(type)) {
+ for (int i = 0; i < maxSizeOfArrayColumn; i++) {
+ SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
+ tableIndexSQL.append(model.getName().toUpperCase())
+ .append("_")
+ .append(String.valueOf(indexSeq++))
+ .append("_IDX ");
+ tableIndexSQL.append("ON ").append(model.getName()).append("(")
+ .append(modelColumn.getColumnName().getStorageName() + "_" + i)
+ .append(")");
+ createIndex(client, connection, model, tableIndexSQL);
+ }
+ } else {
+ SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
+ tableIndexSQL.append(model.getName().toUpperCase())
+ .append("_")
+ .append(String.valueOf(indexSeq++))
+ .append("_IDX ");
+ tableIndexSQL.append("ON ").append(model.getName()).append("(")
+ .append(modelColumn.getColumnName().getStorageName())
+ .append(")");
+ createIndex(client, connection, model, tableIndexSQL);
+ }
}
}
for (final ExtraQueryIndex extraQueryIndex : model.getExtraQueryIndices()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
- .append("_")
- .append(String.valueOf(indexSeq++))
- .append("_IDX ");
+ .append("_")
+ .append(String.valueOf(indexSeq++))
+ .append("_IDX ");
tableIndexSQL.append(" ON ").append(model.getName()).append("(");
final String[] columns = extraQueryIndex.getColumns();
for (int i = 0; i < columns.length; i++) {
@@ -104,17 +123,18 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
@Override
- protected String getColumnType(final ModelColumn column) {
+ protected String getColumn(final ModelColumn column) {
+ final String storageName = column.getColumnName().getStorageName();
final Class<?> type = column.getType();
if (StorageDataComplexObject.class.isAssignableFrom(type)) {
- return "MEDIUMTEXT";
+ return storageName + " MEDIUMTEXT";
} else if (String.class.equals(type)) {
if (column.getLength() > 16383) {
- return "MEDIUMTEXT";
+ return storageName + " MEDIUMTEXT";
} else {
- return "VARCHAR(" + column.getLength() + ")";
+ return storageName + " VARCHAR(" + column.getLength() + ")";
}
}
- return super.getColumnType(column);
+ return super.getColumn(column);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
index 7cb6a38..fd1f881 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
@@ -19,12 +19,16 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
public class MySQLTraceQueryDAO extends H2TraceQueryDAO {
- public MySQLTraceQueryDAO(JDBCHikariCPClient mysqlClient) {
- super(mysqlClient);
+ public MySQLTraceQueryDAO(ModuleManager manager,
+ JDBCHikariCPClient h2Client,
+ final int maxSizeOfArrayColumn,
+ final int numOfSearchableValuesPerTag) {
+ super(manager, h2Client, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
index 45f090a..bec545a 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
@@ -86,7 +87,8 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
index fe499d3..7c3a4ba 100644
--- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
+++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
@@ -50,7 +51,8 @@ public class ProfileTraceDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
- QueryOrder queryOrder) throws IOException {
+ QueryOrder queryOrder,
+ final List<SpanTag> tags) throws IOException {
return null;
}