You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/07/14 22:16:18 UTC

[skywalking] 01/02: Update Elasticsearch client version to latest

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-elasticsearch-health
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit f45a1276584a2a539e5e2fedc00af2d8a15022cb
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jul 14 10:48:58 2020 +0800

    Update Elasticsearch client version to latest
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 oap-server/pom.xml                                 |  2 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 62 +++++++++++-----------
 .../storage-elasticsearch7-plugin/pom.xml          |  2 +-
 3 files changed, 32 insertions(+), 34 deletions(-)

diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index ee05cb4..70b66e1 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -64,7 +64,7 @@
         <h2.version>1.4.196</h2.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
         <commons-io.version>2.6</commons-io.version>
-        <elasticsearch.version>6.3.2</elasticsearch.version>
+        <elasticsearch.version>6.8.10</elasticsearch.version>
         <joda-time.version>2.10.5</joda-time.version>
         <kubernetes.version>8.0.0</kubernetes.version>
         <hikaricp.version>3.1.0</hikaricp.version>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 5d2a90f..03193ea 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -60,11 +60,7 @@ import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -76,10 +72,16 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -131,7 +133,7 @@ public class ElasticSearchClient implements Client {
             }
         }
         client = createClient(hosts);
-        client.ping();
+        client.ping(RequestOptions.DEFAULT);
     }
 
     protected RestHighLevelClient createClient(
@@ -189,7 +191,7 @@ public class ElasticSearchClient implements Client {
         indexName = formatIndexName(indexName);
 
         CreateIndexRequest request = new CreateIndexRequest(indexName);
-        CreateIndexResponse response = client.indices().create(request);
+        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
         log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
@@ -200,15 +202,15 @@ public class ElasticSearchClient implements Client {
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         Gson gson = new Gson();
         request.settings(gson.toJson(settings), XContentType.JSON);
-        request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
-        CreateIndexResponse response = client.indices().create(request);
+        request.mapping(mapping);
+        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
         log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public List<String> retrievalIndexByAliases(String aliases) throws IOException {
         aliases = formatIndexName(aliases);
-        Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases);
+        Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
         if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
             Gson gson = new Gson();
             InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
@@ -245,23 +247,19 @@ public class ElasticSearchClient implements Client {
             indexName = formatIndexName(indexName);
         }
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
-        DeleteIndexResponse response;
-        response = client.indices().delete(request);
+        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
         log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public boolean isExistsIndex(String indexName) throws IOException {
-        indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(indexName);
-        return client.indices().exists(request);
+        return client.indices().exists(new GetIndexRequest(formatIndexName(indexName)), RequestOptions.DEFAULT);
     }
 
     public boolean isExistsTemplate(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
 
-        Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
+        Response response = client.getLowLevelClient().performRequest(new Request(HttpHead.METHOD_NAME, "/_template/" + indexName));
 
         int statusCode = response.getStatusLine().getStatusCode();
         if (statusCode == HttpStatus.SC_OK) {
@@ -291,9 +289,9 @@ public class ElasticSearchClient implements Client {
 
         HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
 
-        Response response = client.getLowLevelClient()
-                                  .performRequest(
-                                      HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+        Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + indexName);
+        request.setEntity(entity);
+        Response response = client.getLowLevelClient().performRequest(request);
         return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
     }
 
@@ -301,7 +299,7 @@ public class ElasticSearchClient implements Client {
         indexName = formatIndexName(indexName);
 
         Response response = client.getLowLevelClient()
-                                  .performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
+                                  .performRequest(new Request(HttpDelete.METHOD_NAME, "/_template/" + indexName));
         return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
     }
 
@@ -310,13 +308,13 @@ public class ElasticSearchClient implements Client {
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source(searchSourceBuilder);
-        return client.search(searchRequest);
+        return client.search(searchRequest, RequestOptions.DEFAULT);
     }
 
     public GetResponse get(String indexName, String id) throws IOException {
         indexName = formatIndexName(indexName);
         GetRequest request = new GetRequest(indexName, TYPE, id);
-        return client.get(request);
+        return client.get(request, RequestOptions.DEFAULT);
     }
 
     public SearchResponse ids(String indexName, String[] ids) throws IOException {
@@ -325,13 +323,13 @@ public class ElasticSearchClient implements Client {
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
-        return client.search(searchRequest);
+        return client.search(searchRequest, RequestOptions.DEFAULT);
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
         IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.index(request);
+        client.index(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
@@ -339,14 +337,14 @@ public class ElasticSearchClient implements Client {
             indexName, id, source);
         request.version(version);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
         org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
             indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -361,12 +359,12 @@ public class ElasticSearchClient implements Client {
 
     public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
         indexName = formatIndexName(indexName);
-        Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
         String jsonString = "{" + "  \"query\": {" + "    \"range\": {" + "      \"" + timeBucketColumnName + "\": {" + "        \"lte\": " + endTimeBucket + "      }" + "    }" + "  }" + "}";
         HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
-        Response response = client.getLowLevelClient()
-                                  .performRequest(
-                                      HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
+        Request request = new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query");
+        request.setEntity(entity);
+        request.addParameter("conflicts", "proceed");
+        Response response = client.getLowLevelClient().performRequest(request);
         log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
         return response.getStatusLine().getStatusCode();
     }
@@ -377,7 +375,7 @@ public class ElasticSearchClient implements Client {
         request.waitForActiveShards(ActiveShardCount.ONE);
         try {
             int size = request.requests().size();
-            BulkResponse responses = client.bulk(request);
+            BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
             log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
         } catch (IOException e) {
             log.error(e.getMessage(), e);
@@ -387,7 +385,7 @@ public class ElasticSearchClient implements Client {
     public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
         BulkProcessor.Listener listener = createBulkListener();
 
-        return BulkProcessor.builder(client::bulkAsync, listener)
+        return BulkProcessor.builder((request, actionListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, actionListener), listener)
                             .setBulkActions(bulkActions)
                             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
                             .setConcurrentRequests(concurrentRequests)
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
index 109392b..3129019 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
@@ -29,7 +29,7 @@
     <artifactId>storage-elasticsearch7-plugin</artifactId>
 
     <properties>
-        <elasticsearch.version>7.0.0</elasticsearch.version>
+        <elasticsearch.version>7.8.0</elasticsearch.version>
     </properties>
 
     <dependencyManagement>