You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/09/17 02:33:53 UTC

[skywalking] 01/01: Replace `mget` with ids' query as `mget` doesn't apply on aliases

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch bug/es
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b396ca10f8fa04d6b0e7eda1fa30c556a12b0784
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Sep 17 10:33:04 2021 +0800

    Replace `mget` with ids' query as `mget` doesn't apply on aliases
---
 .../client/elasticsearch/ElasticSearchClient.java  | 10 +++++--
 .../elasticsearch/client/DocumentClient.java       | 28 ------------------
 .../plugin/elasticsearch/base/MetricsEsDAO.java    | 11 ++++----
 .../elasticsearch/query/MetricsQueryEsDAO.java     | 33 +++++++++++-----------
 4 files changed, 28 insertions(+), 54 deletions(-)

diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 912c235..b6801bf 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -37,9 +38,9 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearch;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
+import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
 import org.apache.skywalking.library.elasticsearch.response.Index;
 import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
 import org.apache.skywalking.library.elasticsearch.response.Mappings;
@@ -274,10 +275,13 @@ public class ElasticSearchClient implements Client, HealthCheckable {
         return es.get().documents().exists(indexName, TYPE, id);
     }
 
-    public Optional<Documents> ids(String indexName, Iterable<String> ids) {
+    public SearchResponse ids(String indexName, Iterable<String> ids) {
         indexName = indexNameConverter.apply(indexName);
 
-        return es.get().documents().mget(indexName, TYPE, ids);
+        return es.get().search(Search.builder()
+                                     .size(Iterables.size(ids))
+                                     .query(Query.ids(ids))
+                                     .build(), indexName);
     }
 
     public void forceInsert(String indexName, String id, Map<String, Object> source) {
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
index d28ff55..f8023f6 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
@@ -32,7 +32,6 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
 import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
 import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -85,33 +84,6 @@ public final class DocumentClient {
     }
 
     @SneakyThrows
-    public Optional<Documents> mget(String index, String type, Iterable<String> ids) {
-        final CompletableFuture<Optional<Documents>> future =
-            version.thenCompose(
-                v -> client.execute(v.requestFactory().document().mget(index, type, ids))
-                           .aggregate().thenApply(response -> {
-                        if (response.status() != HttpStatus.OK) {
-                            throw new RuntimeException(response.contentUtf8());
-                        }
-
-                        try (final HttpData content = response.content();
-                             final InputStream is = content.toInputStream()) {
-                            return Optional.of(v.codec().decode(is, Documents.class));
-                        } catch (Exception e) {
-                            return Exceptions.throwUnsafely(e);
-                        }
-                    }));
-        future.whenComplete((result, exception) -> {
-            if (exception != null) {
-                log.error("Failed to get by ids, {} {}", index, ids, exception);
-            } else if (log.isDebugEnabled()) {
-                log.debug("Succeeded to get docs by ids: {} {} {}", index, ids, result);
-            }
-        });
-        return future.get();
-    }
-
-    @SneakyThrows
     public void index(IndexRequest request, Map<String, Object> params) {
         final CompletableFuture<Void> future = version.thenCompose(
             v -> client.execute(v.requestFactory().document().index(request, params))
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 490af1c..c9a1695 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -88,11 +88,10 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
             List<String> ids = metricList.stream()
                                          .map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
                                          .collect(Collectors.toList());
-            getClient().ids(tableName, ids).ifPresent(documents -> {
-                for (final Document doc : documents) {
-                    Metrics source = storageBuilder.storage2Entity(doc.getSource());
-                    result.add(source);
-                }
+            final SearchResponse response = getClient().ids(tableName, ids);
+            response.getHits().getHits().forEach(hit -> {
+                Metrics source = storageBuilder.storage2Entity(hit.getSource());
+                result.add(source);
             });
         });
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 69a079f..e3f9f44 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
@@ -31,8 +30,8 @@ import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
 import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
-import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
@@ -110,12 +109,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
         }).collect(Collectors.toList());
         MetricsValues metricsValues = new MetricsValues();
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return metricsValues;
         }
 
-        Map<String, Map<String, Object>> idMap = toMap(response.get());
+        Map<String, Map<String, Object>> idMap = toMap(response.getHits());
 
         // Label is null, because in readMetricsValues, no label parameter.
         IntValues intValues = metricsValues.getValues();
@@ -158,15 +157,15 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
             ids.add(id);
         });
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return Collections.emptyList();
         }
         Map<String, DataTable> idMap = new HashMap<>();
-        for (final Document document : response.get()) {
+        for (final SearchHit hit : response.getHits()) {
             idMap.put(
-                document.getId(),
-                new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
+                hit.getId(),
+                new DataTable((String) hit.getSource().getOrDefault(valueColumnName, ""))
             );
         }
         return Util.composeLabelValue(condition, labels, ids, idMap);
@@ -191,11 +190,11 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
 
         HeatMap heatMap = new HeatMap();
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return heatMap;
         }
-        Map<String, Map<String, Object>> idMap = toMap(response.get());
+        Map<String, Map<String, Object>> idMap = toMap(response.getHits());
 
         final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         for (String id : ids) {
@@ -271,10 +270,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
         return sourceBuilder;
     }
 
-    private Map<String, Map<String, Object>> toMap(Documents documents) {
+    private Map<String, Map<String, Object>> toMap(SearchHits hits) {
         Map<String, Map<String, Object>> result = new HashMap<>();
-        for (final Document document : documents) {
-            result.put(document.getId(), document.getSource());
+        for (final SearchHit hit : hits) {
+            result.put(hit.getId(), hit.getSource());
         }
         return result;
     }