You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2022/04/28 02:18:53 UTC

[skywalking] branch clean-scroll-context created (now fa1ddee0b5)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch clean-scroll-context
in repository https://gitbox.apache.org/repos/asf/skywalking.git


      at fa1ddee0b5 Clean up scroll contexts after used

This branch includes the following new commits:

     new fa1ddee0b5 Clean up scroll contexts after used

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Clean up scroll contexts after used

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch clean-scroll-context
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit fa1ddee0b5350aa761a6a542fe920c5b6610afc0
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Thu Apr 28 09:22:21 2022 +0800

    Clean up scroll contexts after used
---
 .github/workflows/skywalking.yaml                  |  3 ++
 docs/en/changes/changes.md                         |  1 +
 .../client/elasticsearch/ElasticSearchClient.java  | 14 +++++---
 .../library/elasticsearch/ElasticSearch.java       |  4 +++
 .../library/elasticsearch/client/SearchClient.java | 23 ++++++++++++++
 .../requests/factory/SearchFactory.java            |  5 +++
 .../factory/common/CommonSearchFactory.java        | 23 ++++++++++++--
 .../cache/NetworkAddressAliasEsDAO.java            | 37 ++++++++++++++--------
 .../query/EBPFProfilingDataEsDAO.java              | 28 ++++++++++------
 .../elasticsearch/query/MetadataQueryEsDAO.java    | 36 ++++++++++++---------
 10 files changed, 128 insertions(+), 46 deletions(-)

diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index 3f7d4a6bc5..21a2a041ef 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -17,6 +17,9 @@
 name: CI
 
 on:
+  push:
+    branches:
+      - "clean-scroll-context"
   pull_request:
     paths-ignore:
       - "**.md"
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 7f365445a4..f4dbdeadea 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -28,6 +28,7 @@
 * Fix the problem that some configurations (such as group.id) did not take effect due to the override order when using the kafkaConsumerConfig property to extend the configuration in Kafka Fetcher.
 * Remove build time from the OAP version.
 * Add data-generator module to run OAP in testing mode, generating mock data for testing.
+* Clean up scroll contexts after used.
 
 #### UI
 
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index cdbd8750ee..482ed58745 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -255,10 +255,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
                   .toArray(String[]::new);
         return es.get().search(
             search,
-            new SearchParams()
-                .ignoreUnavailable(true)
-                .allowNoIndices(true)
-                .expandWildcards("open"),
+            null,
             indexNames);
     }
 
@@ -278,6 +275,15 @@ public class ElasticSearchClient implements Client, HealthCheckable {
         return es.get().scroll(contextRetention, scrollId);
     }
 
+    public boolean deleteScrollContextQuietly(String scrollId) {
+        try {
+            return es.get().deleteScrollContext(scrollId);
+        } catch (Exception e) {
+            log.warn("Failed to delete scroll context: {}", scrollId, e);
+            return false;
+        }
+    }
+
     public Optional<Document> get(String indexName, String id) {
         indexName = indexNameConverter.apply(indexName);
 
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
index acb7fdaea8..de6b7414f7 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
@@ -187,6 +187,10 @@ public final class ElasticSearch implements Closeable {
                   .build());
     }
 
+    public boolean deleteScrollContext(String scrollId) {
+        return searchClient.deleteScrollContext(scrollId);
+    }
+
     @Override
     public void close() {
         endpointGroup.removeListener(healthyEndpointListener);
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
index b442605850..17aff2b3ab 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/SearchClient.java
@@ -102,4 +102,27 @@ public final class SearchClient {
         });
         return future.get();
     }
+
+    @SneakyThrows
+    public boolean deleteScrollContext(String scrollId) {
+        final CompletableFuture<Boolean> future =
+            version.thenCompose(
+                v -> client.execute(v.requestFactory().search().deleteScrollContext(scrollId))
+                           .aggregate().thenApply(response -> {
+                            if (response.status() == HttpStatus.OK) {
+                                return true;
+                            }
+                            throw new RuntimeException(response.contentUtf8());
+                    }));
+        future.whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.error("Failed to delete scroll context, request {}, {}", scrollId, exception);
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Succeeded to delete scroll context, {}", result);
+            }
+        });
+        return future.get();
+    }
 }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
index 51c8f5e989..aa6bf36379 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/SearchFactory.java
@@ -33,6 +33,11 @@ public interface SearchFactory {
      */
     HttpRequest scroll(Scroll scroll);
 
+    /**
+     * Returns a request to delete the scroll context.
+     */
+    HttpRequest deleteScrollContext(String scrollId);
+
     /**
      * Returns a request to search documents.
      */
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
index 4ca76893a4..a54de130aa 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/common/CommonSearchFactory.java
@@ -17,17 +17,19 @@
 
 package org.apache.skywalking.library.elasticsearch.requests.factory.common;
 
+import java.util.HashMap;
+import java.util.Map;
 import com.linecorp.armeria.common.HttpRequest;
 import com.linecorp.armeria.common.HttpRequestBuilder;
 import com.linecorp.armeria.common.MediaType;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
 import org.apache.skywalking.library.elasticsearch.requests.factory.SearchFactory;
 import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -75,4 +77,19 @@ public final class CommonSearchFactory implements SearchFactory {
 
         return builder.content(MediaType.JSON, content).build();
     }
+
+    @SneakyThrows
+    @Override
+    public HttpRequest deleteScrollContext(String scrollId) {
+        final HttpRequestBuilder builder = HttpRequest.builder().delete("/_search/scroll");
+        final Map<String, String> params = new HashMap<>();
+        params.put("scroll_id", scrollId);
+        final byte[] content = version.codec().encode(params);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Delete scroll context request: {}", new String(content));
+        }
+
+        return builder.content(MediaType.JSON, content).build();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 80b207ab42..4ef681e038 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -60,21 +60,30 @@ public class NetworkAddressAliasEsDAO extends EsDAO implements INetworkAddressAl
             final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
             final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder();
 
-            SearchResponse results =
-                getClient().search(NetworkAddressAlias.INDEX_NAME, search, params);
-            while (results.getHits().getTotal() > 0) {
-                for (SearchHit searchHit : results.getHits()) {
-                    networkAddressAliases.add(
-                        builder.storage2Entity(new HashMapConverter.ToEntity(searchHit.getSource())));
+            do {
+                SearchResponse results =
+                    getClient().search(NetworkAddressAlias.INDEX_NAME, search, params);
+                final String scrollId = results.getScrollId();
+                try {
+                    if (results.getHits().getTotal() == 0) {
+                        break;
+                    }
+                    for (SearchHit searchHit : results.getHits()) {
+                        networkAddressAliases.add(
+                            builder.storage2Entity(
+                                new HashMapConverter.ToEntity(searchHit.getSource())));
+                    }
+                    if (results.getHits().getTotal() < batchSize) {
+                        break;
+                    }
+                    if (networkAddressAliases.size() >= resultWindowMaxSize) {
+                        break;
+                    }
+                    results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId);
+                } finally {
+                    getClient().deleteScrollContextQuietly(scrollId);
                 }
-                if (results.getHits().getTotal() < batchSize) {
-                    break;
-                }
-                if (networkAddressAliases.size() >= resultWindowMaxSize) {
-                    break;
-                }
-                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, results.getScrollId());
-            }
+            } while (true);
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index 47dd3c2b1d..65342148d3 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -58,16 +58,24 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD
         final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
         final List<EBPFProfilingDataRecord> records = new ArrayList<>();
 
-        SearchResponse results = getClient().search(index, search.build(), params);
-        while (results.getHits().getTotal() > 0) {
-            final List<EBPFProfilingDataRecord> batch = buildDataList(results);
-            records.addAll(batch);
-            // The last iterate, there is no more data
-            if (batch.size() < scrollingBatchSize) {
-                break;
+        do {
+            SearchResponse results = getClient().search(index, search.build(), params);
+            final String scrollId = results.getScrollId();
+            try {
+                if (results.getHits().getTotal() == 0) {
+                    break;
+                }
+                final List<EBPFProfilingDataRecord> batch = buildDataList(results);
+                records.addAll(batch);
+                // The last iterate, there is no more data
+                if (batch.size() < scrollingBatchSize) {
+                    break;
+                }
+                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId);
+            } finally {
+                getClient().deleteScrollContextQuietly(scrollId);
             }
-            results = getClient().scroll(SCROLL_CONTEXT_RETENTION, results.getScrollId());
-        }
+        } while (true);
         return records;
     }
 
@@ -80,4 +88,4 @@ public class EBPFProfilingDataEsDAO extends EsDAO implements IEBPFProfilingDataD
         }
         return records;
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index d4d4c3aee6..8f06d13988 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Query;
@@ -58,7 +57,6 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModu
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
-
 import static org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
 
 public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
@@ -93,20 +91,28 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
         final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
         final List<Service> services = new ArrayList<>();
 
-        SearchResponse results = getClient().search(index, search.build(), params);
-        while (results.getHits().getTotal() > 0) {
-            final List<Service> batch = buildServices(results);
-            services.addAll(batch);
-            // The last iterate, there is no more data
-            if (batch.size() < batchSize) {
-                break;
-            }
-            // We've got enough data
-            if (services.size() >= queryMaxSize) {
-                break;
+        do {
+            SearchResponse results = getClient().search(index, search.build(), params);
+            final String scrollId = results.getScrollId();
+            try {
+                if (results.getHits().getTotal() == 0) {
+                    break;
+                }
+                final List<Service> batch = buildServices(results);
+                services.addAll(batch);
+                // The last iterate, there is no more data
+                if (batch.size() < batchSize) {
+                    break;
+                }
+                // We've got enough data
+                if (services.size() >= queryMaxSize) {
+                    break;
+                }
+                results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId);
+            } finally {
+                getClient().deleteScrollContextQuietly(scrollId);
             }
-            results = getClient().scroll(SCROLL_CONTEXT_RETENTION, results.getScrollId());
-        }
+        } while (true);
         return services;
     }