You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/17 09:54:43 UTC
[skywalking] branch master updated: Replace `mget` with ids' query
as `mget` doesn't apply on aliases (#7731)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new ad84201 Replace `mget` with ids' query as `mget` doesn't apply on aliases (#7731)
ad84201 is described below
commit ad842013f4c2d9f80f0b217fed8a3419b71d725a
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Sep 17 17:54:26 2021 +0800
Replace `mget` with ids' query as `mget` doesn't apply on aliases (#7731)
---
.../client/elasticsearch/ElasticSearchClient.java | 10 +++++--
.../elasticsearch/client/DocumentClient.java | 28 ------------------
.../plugin/elasticsearch/base/MetricsEsDAO.java | 11 ++++----
.../elasticsearch/query/MetricsQueryEsDAO.java | 33 +++++++++++-----------
4 files changed, 28 insertions(+), 54 deletions(-)
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 912c235..b6801bf 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -37,9 +38,9 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
+import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.Index;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
@@ -274,10 +275,13 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return es.get().documents().exists(indexName, TYPE, id);
}
- public Optional<Documents> ids(String indexName, Iterable<String> ids) {
+ public SearchResponse ids(String indexName, Iterable<String> ids) {
indexName = indexNameConverter.apply(indexName);
- return es.get().documents().mget(indexName, TYPE, ids);
+ return es.get().search(Search.builder()
+ .size(Iterables.size(ids))
+ .query(Query.ids(ids))
+ .build(), indexName);
}
public void forceInsert(String indexName, String id, Map<String, Object> source) {
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
index d28ff55..f8023f6 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
@@ -32,7 +32,6 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
@Slf4j
@RequiredArgsConstructor
@@ -85,33 +84,6 @@ public final class DocumentClient {
}
@SneakyThrows
- public Optional<Documents> mget(String index, String type, Iterable<String> ids) {
- final CompletableFuture<Optional<Documents>> future =
- version.thenCompose(
- v -> client.execute(v.requestFactory().document().mget(index, type, ids))
- .aggregate().thenApply(response -> {
- if (response.status() != HttpStatus.OK) {
- throw new RuntimeException(response.contentUtf8());
- }
-
- try (final HttpData content = response.content();
- final InputStream is = content.toInputStream()) {
- return Optional.of(v.codec().decode(is, Documents.class));
- } catch (Exception e) {
- return Exceptions.throwUnsafely(e);
- }
- }));
- future.whenComplete((result, exception) -> {
- if (exception != null) {
- log.error("Failed to get by ids, {} {}", index, ids, exception);
- } else if (log.isDebugEnabled()) {
- log.debug("Succeeded to get docs by ids: {} {} {}", index, ids, result);
- }
- });
- return future.get();
- }
-
- @SneakyThrows
public void index(IndexRequest request, Map<String, Object> params) {
final CompletableFuture<Void> future = version.thenCompose(
v -> client.execute(v.requestFactory().document().index(request, params))
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 490af1c..c9a1695 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -88,11 +88,10 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
List<String> ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.collect(Collectors.toList());
- getClient().ids(tableName, ids).ifPresent(documents -> {
- for (final Document doc : documents) {
- Metrics source = storageBuilder.storage2Entity(doc.getSource());
- result.add(source);
- }
+ final SearchResponse response = getClient().ids(tableName, ids);
+ response.getHits().getHits().forEach(hit -> {
+ Metrics source = storageBuilder.storage2Entity(hit.getSource());
+ result.add(source);
});
});
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 69a079f..e3f9f44 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
@@ -31,8 +30,8 @@ import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
-import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
@@ -110,12 +109,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
}).collect(Collectors.toList());
MetricsValues metricsValues = new MetricsValues();
- Optional<Documents> response = getClient().ids(tableName, ids);
- if (!response.isPresent()) {
+ SearchResponse response = getClient().ids(tableName, ids);
+ if (response.getHits().getHits().isEmpty()) {
return metricsValues;
}
- Map<String, Map<String, Object>> idMap = toMap(response.get());
+ Map<String, Map<String, Object>> idMap = toMap(response.getHits());
// Label is null, because in readMetricsValues, no label parameter.
IntValues intValues = metricsValues.getValues();
@@ -158,15 +157,15 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
ids.add(id);
});
- Optional<Documents> response = getClient().ids(tableName, ids);
- if (!response.isPresent()) {
+ SearchResponse response = getClient().ids(tableName, ids);
+ if (response.getHits().getHits().isEmpty()) {
return Collections.emptyList();
}
Map<String, DataTable> idMap = new HashMap<>();
- for (final Document document : response.get()) {
+ for (final SearchHit hit : response.getHits()) {
idMap.put(
- document.getId(),
- new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
+ hit.getId(),
+ new DataTable((String) hit.getSource().getOrDefault(valueColumnName, ""))
);
}
return Util.composeLabelValue(condition, labels, ids, idMap);
@@ -191,11 +190,11 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
HeatMap heatMap = new HeatMap();
- Optional<Documents> response = getClient().ids(tableName, ids);
- if (!response.isPresent()) {
+ SearchResponse response = getClient().ids(tableName, ids);
+ if (response.getHits().getHits().isEmpty()) {
return heatMap;
}
- Map<String, Map<String, Object>> idMap = toMap(response.get());
+ Map<String, Map<String, Object>> idMap = toMap(response.getHits());
final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
for (String id : ids) {
@@ -271,10 +270,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
return sourceBuilder;
}
- private Map<String, Map<String, Object>> toMap(Documents documents) {
+ private Map<String, Map<String, Object>> toMap(SearchHits hits) {
Map<String, Map<String, Object>> result = new HashMap<>();
- for (final Document document : documents) {
- result.put(document.getId(), document.getSource());
+ for (final SearchHit hit : hits) {
+ result.put(hit.getId(), hit.getSource());
}
return result;
}