You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/09/17 02:33:52 UTC

[skywalking] branch bug/es created (now b396ca1)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch bug/es
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at b396ca1  Replace `mget` with ids' query as `mget` doesn't apply on aliases

This branch includes the following new commits:

     new b396ca1  Replace `mget` with ids' query as `mget` doesn't apply on aliases

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking] 01/01: Replace `mget` with ids' query as `mget` doesn't apply on aliases

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch bug/es
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b396ca10f8fa04d6b0e7eda1fa30c556a12b0784
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Sep 17 10:33:04 2021 +0800

    Replace `mget` with ids' query as `mget` doesn't apply on aliases
---
 .../client/elasticsearch/ElasticSearchClient.java  | 10 +++++--
 .../elasticsearch/client/DocumentClient.java       | 28 ------------------
 .../plugin/elasticsearch/base/MetricsEsDAO.java    | 11 ++++----
 .../elasticsearch/query/MetricsQueryEsDAO.java     | 33 +++++++++++-----------
 4 files changed, 28 insertions(+), 54 deletions(-)

diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 912c235..b6801bf 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -37,9 +38,9 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearch;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
+import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
 import org.apache.skywalking.library.elasticsearch.response.Index;
 import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
 import org.apache.skywalking.library.elasticsearch.response.Mappings;
@@ -274,10 +275,13 @@ public class ElasticSearchClient implements Client, HealthCheckable {
         return es.get().documents().exists(indexName, TYPE, id);
     }
 
-    public Optional<Documents> ids(String indexName, Iterable<String> ids) {
+    public SearchResponse ids(String indexName, Iterable<String> ids) {
         indexName = indexNameConverter.apply(indexName);
 
-        return es.get().documents().mget(indexName, TYPE, ids);
+        return es.get().search(Search.builder()
+                                     .size(Iterables.size(ids))
+                                     .query(Query.ids(ids))
+                                     .build(), indexName);
     }
 
     public void forceInsert(String indexName, String id, Map<String, Object> source) {
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
index d28ff55..f8023f6 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
@@ -32,7 +32,6 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
 import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
 import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -85,33 +84,6 @@ public final class DocumentClient {
     }
 
     @SneakyThrows
-    public Optional<Documents> mget(String index, String type, Iterable<String> ids) {
-        final CompletableFuture<Optional<Documents>> future =
-            version.thenCompose(
-                v -> client.execute(v.requestFactory().document().mget(index, type, ids))
-                           .aggregate().thenApply(response -> {
-                        if (response.status() != HttpStatus.OK) {
-                            throw new RuntimeException(response.contentUtf8());
-                        }
-
-                        try (final HttpData content = response.content();
-                             final InputStream is = content.toInputStream()) {
-                            return Optional.of(v.codec().decode(is, Documents.class));
-                        } catch (Exception e) {
-                            return Exceptions.throwUnsafely(e);
-                        }
-                    }));
-        future.whenComplete((result, exception) -> {
-            if (exception != null) {
-                log.error("Failed to get by ids, {} {}", index, ids, exception);
-            } else if (log.isDebugEnabled()) {
-                log.debug("Succeeded to get docs by ids: {} {} {}", index, ids, result);
-            }
-        });
-        return future.get();
-    }
-
-    @SneakyThrows
     public void index(IndexRequest request, Map<String, Object> params) {
         final CompletableFuture<Void> future = version.thenCompose(
             v -> client.execute(v.requestFactory().document().index(request, params))
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 490af1c..c9a1695 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -88,11 +88,10 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
             List<String> ids = metricList.stream()
                                          .map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
                                          .collect(Collectors.toList());
-            getClient().ids(tableName, ids).ifPresent(documents -> {
-                for (final Document doc : documents) {
-                    Metrics source = storageBuilder.storage2Entity(doc.getSource());
-                    result.add(source);
-                }
+            final SearchResponse response = getClient().ids(tableName, ids);
+            response.getHits().getHits().forEach(hit -> {
+                Metrics source = storageBuilder.storage2Entity(hit.getSource());
+                result.add(source);
             });
         });
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 69a079f..e3f9f44 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
 import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
@@ -31,8 +30,8 @@ import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
 import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
-import org.apache.skywalking.library.elasticsearch.response.Document;
-import org.apache.skywalking.library.elasticsearch.response.Documents;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
@@ -110,12 +109,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
         }).collect(Collectors.toList());
         MetricsValues metricsValues = new MetricsValues();
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return metricsValues;
         }
 
-        Map<String, Map<String, Object>> idMap = toMap(response.get());
+        Map<String, Map<String, Object>> idMap = toMap(response.getHits());
 
         // Label is null, because in readMetricsValues, no label parameter.
         IntValues intValues = metricsValues.getValues();
@@ -158,15 +157,15 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
             ids.add(id);
         });
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return Collections.emptyList();
         }
         Map<String, DataTable> idMap = new HashMap<>();
-        for (final Document document : response.get()) {
+        for (final SearchHit hit : response.getHits()) {
             idMap.put(
-                document.getId(),
-                new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
+                hit.getId(),
+                new DataTable((String) hit.getSource().getOrDefault(valueColumnName, ""))
             );
         }
         return Util.composeLabelValue(condition, labels, ids, idMap);
@@ -191,11 +190,11 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
 
         HeatMap heatMap = new HeatMap();
 
-        Optional<Documents> response = getClient().ids(tableName, ids);
-        if (!response.isPresent()) {
+        SearchResponse response = getClient().ids(tableName, ids);
+        if (response.getHits().getHits().isEmpty()) {
             return heatMap;
         }
-        Map<String, Map<String, Object>> idMap = toMap(response.get());
+        Map<String, Map<String, Object>> idMap = toMap(response.getHits());
 
         final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         for (String id : ids) {
@@ -271,10 +270,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
         return sourceBuilder;
     }
 
-    private Map<String, Map<String, Object>> toMap(Documents documents) {
+    private Map<String, Map<String, Object>> toMap(SearchHits hits) {
         Map<String, Map<String, Object>> result = new HashMap<>();
-        for (final Document document : documents) {
-            result.put(document.getId(), document.getSource());
+        for (final SearchHit hit : hits) {
+            result.put(hit.getId(), hit.getSource());
         }
         return result;
     }