You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/07/14 22:16:18 UTC
[skywalking] 01/02: Update Elasticsearch client version to latest
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-elasticsearch-health
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f45a1276584a2a539e5e2fedc00af2d8a15022cb
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jul 14 10:48:58 2020 +0800
Update Elasticsearch client version to latest
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
oap-server/pom.xml | 2 +-
.../client/elasticsearch/ElasticSearchClient.java | 62 +++++++++++-----------
.../storage-elasticsearch7-plugin/pom.xml | 2 +-
3 files changed, 32 insertions(+), 34 deletions(-)
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index ee05cb4..70b66e1 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -64,7 +64,7 @@
<h2.version>1.4.196</h2.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-io.version>2.6</commons-io.version>
- <elasticsearch.version>6.3.2</elasticsearch.version>
+ <elasticsearch.version>6.8.10</elasticsearch.version>
<joda-time.version>2.10.5</joda-time.version>
<kubernetes.version>8.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 5d2a90f..03193ea 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -60,11 +60,7 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -76,10 +72,16 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
@@ -131,7 +133,7 @@ public class ElasticSearchClient implements Client {
}
}
client = createClient(hosts);
- client.ping();
+ client.ping(RequestOptions.DEFAULT);
}
protected RestHighLevelClient createClient(
@@ -189,7 +191,7 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
- CreateIndexResponse response = client.indices().create(request);
+ CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
@@ -200,15 +202,15 @@ public class ElasticSearchClient implements Client {
CreateIndexRequest request = new CreateIndexRequest(indexName);
Gson gson = new Gson();
request.settings(gson.toJson(settings), XContentType.JSON);
- request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
- CreateIndexResponse response = client.indices().create(request);
+ request.mapping(mapping);
+ CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public List<String> retrievalIndexByAliases(String aliases) throws IOException {
aliases = formatIndexName(aliases);
- Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases);
+ Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
Gson gson = new Gson();
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
@@ -245,23 +247,19 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName);
}
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
- DeleteIndexResponse response;
- response = client.indices().delete(request);
+ AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean isExistsIndex(String indexName) throws IOException {
- indexName = formatIndexName(indexName);
- GetIndexRequest request = new GetIndexRequest();
- request.indices(indexName);
- return client.indices().exists(request);
+ return client.indices().exists(new GetIndexRequest(formatIndexName(indexName)), RequestOptions.DEFAULT);
}
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
+ Response response = client.getLowLevelClient().performRequest(new Request(HttpHead.METHOD_NAME, "/_template/" + indexName));
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
@@ -291,9 +289,9 @@ public class ElasticSearchClient implements Client {
HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
- Response response = client.getLowLevelClient()
- .performRequest(
- HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+ Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + indexName);
+ request.setEntity(entity);
+ Response response = client.getLowLevelClient().performRequest(request);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
@@ -301,7 +299,7 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName);
Response response = client.getLowLevelClient()
- .performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
+ .performRequest(new Request(HttpDelete.METHOD_NAME, "/_template/" + indexName));
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
@@ -310,13 +308,13 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source(searchSourceBuilder);
- return client.search(searchRequest);
+ return client.search(searchRequest, RequestOptions.DEFAULT);
}
public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, TYPE, id);
- return client.get(request);
+ return client.get(request, RequestOptions.DEFAULT);
}
public SearchResponse ids(String indexName, String[] ids) throws IOException {
@@ -325,13 +323,13 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
- return client.search(searchRequest);
+ return client.search(searchRequest, RequestOptions.DEFAULT);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.index(request);
+ client.index(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
@@ -339,14 +337,14 @@ public class ElasticSearchClient implements Client {
indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request);
+ client.update(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request);
+ client.update(request, RequestOptions.DEFAULT);
}
public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -361,12 +359,12 @@ public class ElasticSearchClient implements Client {
public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName);
- Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
String jsonString = "{" + " \"query\": {" + " \"range\": {" + " \"" + timeBucketColumnName + "\": {" + " \"lte\": " + endTimeBucket + " }" + " }" + " }" + "}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
- Response response = client.getLowLevelClient()
- .performRequest(
- HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
+ Request request = new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query");
+ request.setEntity(entity);
+ request.addParameter("conflicts", "proceed");
+ Response response = client.getLowLevelClient().performRequest(request);
log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
}
@@ -377,7 +375,7 @@ public class ElasticSearchClient implements Client {
request.waitForActiveShards(ActiveShardCount.ONE);
try {
int size = request.requests().size();
- BulkResponse responses = client.bulk(request);
+ BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
log.error(e.getMessage(), e);
@@ -387,7 +385,7 @@ public class ElasticSearchClient implements Client {
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();
- return BulkProcessor.builder(client::bulkAsync, listener)
+ return BulkProcessor.builder((request, actionListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, actionListener), listener)
.setBulkActions(bulkActions)
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
index 109392b..3129019 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/pom.xml
@@ -29,7 +29,7 @@
<artifactId>storage-elasticsearch7-plugin</artifactId>
<properties>
- <elasticsearch.version>7.0.0</elasticsearch.version>
+ <elasticsearch.version>7.8.0</elasticsearch.version>
</properties>
<dependencyManagement>