You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/01 16:54:56 UTC

[incubator-skywalking] branch jaeger-receiver updated: Revert "Provide create, delete, isExists template method by elastic client. (#2425)"

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch jaeger-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/jaeger-receiver by this push:
     new 48b0501  Revert "Provide create, delete, isExists template method by elastic client. (#2425)"
48b0501 is described below

commit 48b0501cf262530778aa45f1ef351a46f8599768
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Apr 1 09:51:25 2019 -0700

    Revert "Provide create, delete, isExists template method by elastic client. (#2425)"
    
    This reverts commit e6a7720f0357f2cff9077a70e4c135d704feb2b6.
---
 oap-server/pom.xml                                 |   2 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 155 ++++++++++-----------
 2 files changed, 76 insertions(+), 81 deletions(-)

diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index f663e2d..dba0370 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -62,7 +62,7 @@
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
         <commons-io.version>2.6</commons-io.version>
-        <elasticsearch.version>6.7.0</elasticsearch.version>
+        <elasticsearch.version>6.3.2</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
         <hikaricp.version>3.1.0</hikaricp.version>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 1d57a0d..0e65d72 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,35 +18,53 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.function.BiConsumer;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
-import org.apache.http.auth.*;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
-import org.elasticsearch.action.bulk.*;
-import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.get.MultiGetResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.*;
-import org.elasticsearch.client.indices.*;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.reindex.*;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author peng-yongsheng
@@ -76,7 +94,13 @@ public class ElasticSearchClient implements Client {
             final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
             credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
-                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+                        @Override
+                        public HttpAsyncClientBuilder customizeHttpClient(
+                                HttpAsyncClientBuilder httpClientBuilder) {
+                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                        }
+                    });
         } else {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
         }
@@ -109,8 +133,8 @@ public class ElasticSearchClient implements Client {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         request.settings(settings);
-        request.mapping(mappingBuilder);
-        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
+        request.mapping(TYPE, mappingBuilder);
+        CreateIndexResponse response = client.indices().create(request);
         logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
@@ -118,44 +142,17 @@ public class ElasticSearchClient implements Client {
     public boolean deleteIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
-        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
+        DeleteIndexResponse response;
+        response = client.indices().delete(request);
         logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public boolean isExistsIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest(indexName);
-        return client.indices().exists(request, RequestOptions.DEFAULT);
-    }
-
-    public boolean isExistsTemplate(String indexName) throws IOException {
-        indexName = formatIndexName(indexName);
-        IndexTemplatesExistRequest request = new IndexTemplatesExistRequest(indexName);
-        return client.indices().existsTemplate(request, RequestOptions.DEFAULT);
-    }
-
-    public boolean createTemplate(String indexName, Settings settings,
-        XContentBuilder mappingBuilder) throws IOException {
-        indexName = formatIndexName(indexName);
-
-        org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest request = new PutIndexTemplateRequest(indexName);
-        request.patterns(Collections.singletonList(indexName + "*"));
-        request.settings(settings);
-        request.mapping("_doc", mappingBuilder);
-
-        AcknowledgedResponse response = client.indices().putTemplate(request, RequestOptions.DEFAULT);
-        logger.debug("create {} template finished, isAcknowledged: {}", indexName, response.isAcknowledged());
-        return response.isAcknowledged();
-    }
-
-    public boolean deleteTemplate(String indexName) throws IOException {
-        indexName = formatIndexName(indexName);
-
-        DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest();
-        request.name(indexName);
-        AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT);
-        return deleteTemplateAcknowledge.isAcknowledged();
+        GetIndexRequest request = new GetIndexRequest();
+        request.indices(indexName);
+        return client.indices().exists(request);
     }
 
     public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
@@ -163,39 +160,39 @@ public class ElasticSearchClient implements Client {
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source(searchSourceBuilder);
-        return client.search(searchRequest, RequestOptions.DEFAULT);
+        return client.search(searchRequest);
     }
 
     public GetResponse get(String indexName, String id) throws IOException {
         indexName = formatIndexName(indexName);
         GetRequest request = new GetRequest(indexName, TYPE, id);
-        return client.get(request, RequestOptions.DEFAULT);
+        return client.get(request);
     }
 
     public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException {
         final String newIndexName = formatIndexName(indexName);
         MultiGetRequest request = new MultiGetRequest();
         ids.forEach(id -> request.add(newIndexName, TYPE, id));
-        return client.mget(request, RequestOptions.DEFAULT);
+        return client.multiGet(request);
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
         IndexRequest request = prepareInsert(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.index(request, RequestOptions.DEFAULT);
+        client.index(request);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
         request.version(version);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request, RequestOptions.DEFAULT);
+        client.update(request);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request, RequestOptions.DEFAULT);
+        client.update(request);
     }
 
     public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -210,14 +207,20 @@ public class ElasticSearchClient implements Client {
 
     public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
         indexName = formatIndexName(indexName);
-
-        DeleteByQueryRequest request = new DeleteByQueryRequest();
-        request.indices(indexName);
-        request.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
-
-        BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
-        logger.debug("Delete data from index {}, deleted {}", indexName, response.getDeleted());
-        return 200;
+        Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
+        String jsonString = "{" +
+            "  \"query\": {" +
+            "    \"range\": {" +
+            "      \"" + timeBucketColumnName + "\": {" +
+            "        \"lte\": " + endTimeBucket +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}";
+        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
+        Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
+        logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
+        return response.getStatusLine().getStatusCode();
     }
 
     public String formatIndexName(String indexName) {
@@ -232,30 +235,22 @@ public class ElasticSearchClient implements Client {
         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
             @Override
             public void beforeBulk(long executionId, BulkRequest request) {
-                int numberOfActions = request.numberOfActions();
-                logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
+
             }
 
             @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                if (response.hasFailures()) {
-                    logger.warn("Bulk [{}] executed with failures", executionId);
-                } else {
-                    logger.debug("Bulk [{}] completed in {} milliseconds",
-                        executionId, response.getTook().getMillis());
-                }
+            public void afterBulk(long executionId, BulkRequest request,
+                BulkResponse response) {
+
             }
 
             @Override
             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                logger.error("Failed to execute bulk", failure);
+                logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
             }
         };
 
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
-            (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
-
-        return BulkProcessor.builder(bulkConsumer, listener)
+        return BulkProcessor.builder(client::bulkAsync, listener)
             .setBulkActions(bulkActions)
             .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))