You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/21 04:11:38 UTC
[skywalking] 01/01: Rebuilt ElasticSearch client on top of their
REST API
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/elasticsearch-client
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit e469ebc1651f3a311a6dc6e72a1d0277bbc82104
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sat Aug 21 10:29:31 2021 +0800
Rebuilt ElasticSearch client on top of their REST API
---
.../storage/type/StorageDataComplexObject.java | 22 ++
oap-server/server-library/library-client/pom.xml | 6 +
.../client/elasticsearch/ElasticSearchClient.java | 295 ++++++---------------
.../elasticsearch/ElasticSearchInsertRequest.java | 26 +-
.../elasticsearch/ElasticSearchUpdateRequest.java | 27 +-
.../elasticsearch/ITElasticSearchClient.java | 40 +--
.../{ => library-elasticsearch-client}/pom.xml | 22 +-
.../library/elasticsearch/ElasticSearchClient.java | 123 +++++++++
.../elasticsearch/ElasticSearchClientBuilder.java | 150 +++++++++++
.../elasticsearch/ElasticSearchVersion.java | 80 ++++++
.../library/elasticsearch/client/AliasClient.java | 64 +++++
.../elasticsearch/client/DocumentClient.java | 119 +++++++++
.../library/elasticsearch/client/IndexClient.java | 110 ++++++++
.../elasticsearch/client/TemplateClient.java | 107 ++++++++
.../requests/factory/AliasFactory.java} | 22 +-
.../requests/factory/DocumentFactory.java | 54 ++++
.../requests/factory/IndexFactory.java} | 34 ++-
.../requests/factory/RequestFactory.java | 72 +++++
.../requests/factory/TemplateFactory.java} | 30 ++-
.../requests/factory/v6/V6AliasFactory.java} | 21 +-
.../requests/factory/v6/V6DocumentFactory.java | 104 ++++++++
.../requests/factory/v6/V6IndexFactory.java | 91 +++++++
.../requests/factory/v6/V6RequestFactory.java | 48 ++++
.../requests/factory/v6/V6TemplateFactory.java | 79 ++++++
.../library/elasticsearch/response/Document.java} | 30 +--
.../library/elasticsearch/response/Documents.java} | 29 +-
.../library/elasticsearch/response/NodeInfo.java} | 27 +-
.../library/elasticsearch/util/JsonSerializer.java | 46 ++++
.../elasticsearch/ElasticSearchClientTest.java} | 29 +-
oap-server/server-library/pom.xml | 3 +-
.../storage/plugin/elasticsearch/base/EsDAO.java | 1 +
.../elasticsearch/base/HistoryDeleteEsDAO.java | 5 +-
.../plugin/elasticsearch/base/ManagementEsDAO.java | 14 +-
.../plugin/elasticsearch/base/MetricsEsDAO.java | 22 +-
.../plugin/elasticsearch/base/NoneStreamEsDAO.java | 6 +-
.../plugin/elasticsearch/base/RecordEsDAO.java | 6 +-
.../elasticsearch/query/MetricsQueryEsDAO.java | 42 +--
.../query/UITemplateManagementEsDAO.java | 23 +-
.../client/ElasticSearch7Client.java | 40 +--
39 files changed, 1592 insertions(+), 477 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
index ebad19e..855fb15 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
@@ -18,9 +18,16 @@
package org.apache.skywalking.oap.server.core.storage.type;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import java.io.IOException;
+
/**
* StorageDataComplexObject implementation supports String-Object interconversion.
*/
+@JsonSerialize(using = StorageDataComplexObject.Serializer.class)
public interface StorageDataComplexObject<T> {
/**
* @return string representing this object.
@@ -36,4 +43,19 @@ public interface StorageDataComplexObject<T> {
* Initialize the object based on the given source.
*/
void copyFrom(T source);
+
+ class Serializer extends StdSerializer<StorageDataComplexObject<?>> {
+ protected Serializer(final Class<StorageDataComplexObject<?>> t) {
+ super(t);
+ }
+
+ @Override
+ public void serialize(
+ final StorageDataComplexObject value,
+ final JsonGenerator gen,
+ final SerializerProvider provider)
+ throws IOException {
+ gen.writeString(value.toStorageData());
+ }
+ }
}
diff --git a/oap-server/server-library/library-client/pom.xml b/oap-server/server-library/library-client/pom.xml
index d5c6b82..43c4e56 100755
--- a/oap-server/server-library/library-client/pom.xml
+++ b/oap-server/server-library/library-client/pom.xml
@@ -40,6 +40,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>library-elasticsearch-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 38e3af6..e9d5cc6 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -19,13 +19,9 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.common.base.Splitter;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.reflect.TypeToken;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
@@ -33,8 +29,8 @@ import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -48,55 +44,34 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
-import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.library.util.HealthChecker;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Response;
-import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
@@ -122,6 +97,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
private final int connectTimeout;
private final int socketTimeout;
+ org.apache.skywalking.library.elasticsearch.ElasticSearchClient esClient;
+
public ElasticSearchClient(String clusterNodes,
String protocol,
String trustStorePath,
@@ -140,6 +117,15 @@ public class ElasticSearchClient implements Client, HealthCheckable {
this.trustStorePass = trustStorePass;
this.connectTimeout = connectTimeout;
this.socketTimeout = socketTimeout;
+ this.esClient = org.apache.skywalking.library.elasticsearch.ElasticSearchClient.builder()
+ .endpoints(clusterNodes.split(","))
+ .protocol(protocol)
+ .trustStorePath(trustStorePath)
+ .trustStorePass(trustStorePass)
+ .username(user)
+ .password(password)
+ .connectTimeout(connectTimeout)
+ .build();
}
@Override
@@ -156,6 +142,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
client = createClient(hosts);
client.ping();
+ esClient.connect();
} finally {
connectLock.unlock();
}
@@ -201,6 +188,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
@Override
public void shutdown() throws IOException {
client.close();
+ esClient.close();
}
public static List<HttpHost> parseClusterNodes(String protocol, String nodes) {
@@ -220,22 +208,13 @@ public class ElasticSearchClient implements Client, HealthCheckable {
public boolean createIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- CreateIndexRequest request = new CreateIndexRequest(indexName);
- CreateIndexResponse response = client.indices().create(request);
- log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
- return response.isAcknowledged();
+ return esClient.index().create(indexName);
}
public boolean updateIndexMapping(String indexName, Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
- Map<String, Object> properties = (Map<String, Object>) mapping.get(ElasticSearchClient.TYPE);
- PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
- Gson gson = new Gson();
- putMappingRequest.type(ElasticSearchClient.TYPE);
- putMappingRequest.source(gson.toJson(properties), XContentType.JSON);
- PutMappingResponse response = client.indices().putMapping(putMappingRequest);
- log.debug("put {} index mapping finished, isAcknowledged: {}", indexName, response.isAcknowledged());
- return response.isAcknowledged();
+
+ return esClient.index().putMapping(indexName, TYPE, mapping);
}
public Map<String, Object> getIndex(String indexName) throws IOException {
@@ -244,69 +223,22 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
indexName = formatIndexName(indexName);
try {
- Response response = client.getLowLevelClient()
- .performRequest(HttpGet.METHOD_NAME, "/" + indexName);
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode != HttpStatus.SC_OK) {
- healthChecker.health();
- throw new IOException(
- "The response status code of template exists request should be 200, but it is " + statusCode);
+ final Map<String, Object> indices = esClient.index().get(indexName);
+ if (indices.containsKey(indexName)) {
+ // noinspection unchecked
+ return (Map<String, Object>) indices.get(indexName);
}
- Type type = new TypeToken<HashMap<String, Object>>() {
- }.getType();
- Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
- new InputStreamReader(response.getEntity().getContent()),
- type
- );
- return (Map<String, Object>) Optional.ofNullable(templates.get(indexName)).orElse(new HashMap<>());
- } catch (ResponseException e) {
- if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
- return new HashMap<>();
- }
- healthChecker.unHealth(e);
- throw e;
- } catch (IOException t) {
+ return Collections.emptyMap();
+ } catch (Exception t) {
healthChecker.unHealth(t);
throw t;
}
}
- public boolean createIndex(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
- indexName = formatIndexName(indexName);
- CreateIndexRequest request = new CreateIndexRequest(indexName);
- Gson gson = new Gson();
- request.settings(gson.toJson(settings), XContentType.JSON);
- request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
- CreateIndexResponse response = client.indices().create(request);
- log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
- return response.isAcknowledged();
- }
-
- public List<String> retrievalIndexByAliases(String aliases) throws IOException {
+ public Collection<String> retrievalIndexByAliases(String aliases) throws IOException {
aliases = formatIndexName(aliases);
- Response response;
- try {
- response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases);
- healthChecker.health();
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- throw t;
- }
- if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
- Gson gson = new Gson();
- InputStreamReader reader;
- try {
- reader = new InputStreamReader(response.getEntity().getContent());
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- throw t;
- }
- JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
- log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
- return new ArrayList<>(responseJson.keySet());
- }
- return Collections.emptyList();
+
+ return esClient.alias().indices(aliases).keySet();
}
/**
@@ -334,98 +266,48 @@ public class ElasticSearchClient implements Client, HealthCheckable {
if (formatIndexName) {
indexName = formatIndexName(indexName);
}
- DeleteIndexRequest request = new DeleteIndexRequest(indexName);
- DeleteIndexResponse response;
- response = client.indices().delete(request);
- log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
- return response.isAcknowledged();
+ return esClient.index().delete(indexName);
}
public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- GetIndexRequest request = new GetIndexRequest();
- request.indices(indexName);
- return client.indices().exists(request);
+
+ return esClient.index().exists(indexName);
}
public Map<String, Object> getTemplate(String name) throws IOException {
name = formatIndexName(name);
+
try {
- Response response = client.getLowLevelClient()
- .performRequest(HttpGet.METHOD_NAME, "/_template/" + name);
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode != HttpStatus.SC_OK) {
- healthChecker.health();
- throw new IOException(
- "The response status code of template exists request should be 200, but it is " + statusCode);
- }
- Type type = new TypeToken<HashMap<String, Object>>() {
- }.getType();
- Map<String, Object> templates = new Gson().<HashMap<String, Object>>fromJson(
- new InputStreamReader(response.getEntity().getContent()),
- type
- );
+ Map<String, Object> templates = esClient.templates().get(name);
if (templates.containsKey(name)) {
+ // noinspection unchecked
return (Map<String, Object>) templates.get(name);
}
- return new HashMap<>();
- } catch (ResponseException e) {
- if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
- return new HashMap<>();
- }
+ return Collections.emptyMap();
+ } catch (Exception e) {
healthChecker.unHealth(e);
throw e;
- } catch (IOException t) {
- healthChecker.unHealth(t);
- throw t;
}
}
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
-
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode == HttpStatus.SC_OK) {
- return true;
- } else if (statusCode == HttpStatus.SC_NOT_FOUND) {
- return false;
- } else {
- throw new IOException(
- "The response status code of template exists request should be 200 or 404, but it is " + statusCode);
- }
+ return esClient.templates().exists(indexName);
}
public boolean createOrUpdateTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping, int order) throws IOException {
indexName = formatIndexName(indexName);
- String[] patterns = new String[] {indexName + "-*"};
-
- Map<String, Object> aliases = new HashMap<>();
- aliases.put(indexName, new JsonObject());
-
- Map<String, Object> template = new HashMap<>();
- template.put("index_patterns", patterns);
- template.put("aliases", aliases);
- template.put("settings", settings);
- template.put("mappings", mapping);
- template.put("order", order);
-
- HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
-
- Response response = client.getLowLevelClient()
- .performRequest(
- HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
- return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
+ return esClient.templates().createOrUpdate(indexName, settings, mapping, order);
}
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- Response response = client.getLowLevelClient()
- .performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
- return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
+
+ return esClient.templates().delete(indexName);
}
public SearchResponse search(IndexNameMaker indexNameMaker,
@@ -472,40 +354,40 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
}
- public GetResponse get(String indexName, String id) throws IOException {
+ public Optional<Document> get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
- GetRequest request = new GetRequest(indexName, TYPE, id);
- try {
- GetResponse response = client.get(request);
- healthChecker.health();
- return response;
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- throw t;
- }
+
+ return esClient.documents().get(indexName, TYPE, id);
}
- public SearchResponse ids(String indexName, String[] ids) throws IOException {
+ public boolean existDoc(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
- SearchRequest searchRequest = new SearchRequest(indexName);
- searchRequest.types(TYPE);
- searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
- try {
- SearchResponse response = client.search(searchRequest);
- healthChecker.health();
- return response;
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- throw t;
- }
+ return esClient.documents().exists(indexName, TYPE, id);
+ }
+
+ public Optional<Documents> ids(String indexName, Iterable<String> ids) {
+ indexName = formatIndexName(indexName);
+
+ return esClient.documents().mget(indexName, TYPE, ids);
}
- public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
- IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+ public Optional<Documents> ids(String indexName, String[] ids) {
+ return ids(indexName, Arrays.asList(ids));
+ }
+
+ public void forceInsert(String indexName, String id, Map<String, Object> source)
+ throws IOException {
+ ElasticSearchInsertRequest request = prepareInsert(indexName, id, source);
+ Map<String, Object> params = ImmutableMap.of("refresh", "true");
try {
- client.index(request);
+ esClient.documents().index(
+ request.getIndex(),
+ request.getType(),
+ request.getId(),
+ request.getSource(),
+ params
+ );
healthChecker.health();
} catch (Throwable t) {
healthChecker.unHealth(t);
@@ -513,12 +395,18 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
}
- public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
- indexName, id, source);
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+ public void forceUpdate(String indexName, String id, Map<String, Object> source)
+ throws IOException {
+ ElasticSearchUpdateRequest request = prepareUpdate(indexName, id, source);
+ Map<String, Object> params = ImmutableMap.of("refresh", "true");
try {
- client.update(request);
+ esClient.documents().update(
+ request.getIndex(),
+ request.getType(),
+ request.getId(),
+ request.getSource(),
+ params
+ );
healthChecker.health();
} catch (Throwable t) {
healthChecker.unHealth(t);
@@ -526,14 +414,15 @@ public class ElasticSearchClient implements Client, HealthCheckable {
}
}
- public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
+ public ElasticSearchInsertRequest prepareInsert(String indexName, String id, Map<String, Object> source) {
indexName = formatIndexName(indexName);
- return new ElasticSearchInsertRequest(indexName, TYPE, id).source(source);
+ return new ElasticSearchInsertRequest(indexName, TYPE, id, source);
}
- public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
+ public ElasticSearchUpdateRequest prepareUpdate(String indexName, String id,
+ Map<String, Object> source) {
indexName = formatIndexName(indexName);
- return new ElasticSearchUpdateRequest(indexName, TYPE, id).doc(source);
+ return new ElasticSearchUpdateRequest(indexName, TYPE, id, source);
}
public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
@@ -548,24 +437,6 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return response.getStatusLine().getStatusCode();
}
- /**
- * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future.
- */
- @Deprecated
- public void synchronousBulk(BulkRequest request) {
- request.timeout(TimeValue.timeValueMinutes(2));
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
- request.waitForActiveShards(ActiveShardCount.ONE);
- try {
- int size = request.requests().size();
- BulkResponse responses = client.bulk(request);
- log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
- healthChecker.health();
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- }
- }
-
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java
index 9c0655c..2dd4851 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchInsertRequest.java
@@ -17,19 +17,25 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
+import java.util.Map;
+import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-public class ElasticSearchInsertRequest extends IndexRequest implements InsertRequest {
+@Getter
+public class ElasticSearchInsertRequest implements InsertRequest {
+ private final String index;
- public ElasticSearchInsertRequest(String index, String type, String id) {
- super(index, type, id);
- }
+ private final String type;
+
+ private final String id;
+
+ private final Map<String, Object> source;
- @Override
- public ElasticSearchInsertRequest source(XContentBuilder sourceBuilder) {
- super.source(sourceBuilder);
- return this;
+ public ElasticSearchInsertRequest(String index, String type, String id,
+ Map<String, Object> source) {
+ this.index = index;
+ this.type = type;
+ this.id = id;
+ this.source = source;
}
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java
index 2663856..c1c9ef3 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java
@@ -17,18 +17,25 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-public class ElasticSearchUpdateRequest extends UpdateRequest implements org.apache.skywalking.oap.server.library.client.request.UpdateRequest {
+@Getter
+public class ElasticSearchUpdateRequest implements UpdateRequest {
+ private final String index;
- public ElasticSearchUpdateRequest(String index, String type, String id) {
- super(index, type, id);
- }
+ private final String type;
+
+ private final String id;
+
+ private final Map<String, Object> source;
- @Override
- public ElasticSearchUpdateRequest doc(XContentBuilder source) {
- super.doc(source);
- return this;
+ public ElasticSearchUpdateRequest(String index, String type, String id,
+ Map<String, Object> source) {
+ this.index = index;
+ this.type = type;
+ this.id = id;
+ this.source = source;
}
}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index ae9a9a2..4f7f4fc 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -18,26 +18,27 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
+import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpGet;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.library.elasticsearch.response.Document;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After;
@@ -95,7 +96,7 @@ public class ITElasticSearchClient {
properties.add("column1", column);
String indexName = "test_index_operate";
- client.createIndex(indexName, settings, doc);
+ // client.createIndex(indexName, settings, doc); // TODO
Assert.assertTrue(client.isExistsIndex(indexName));
JsonObject index = getIndex(indexName);
@@ -127,26 +128,25 @@ public class ITElasticSearchClient {
public void documentOperate() throws IOException {
String id = String.valueOf(System.currentTimeMillis());
- XContentBuilder builder = XContentFactory.jsonBuilder()
- .startObject()
- .field("user", "kimchy")
- .field("post_date", "2009-11-15T14:12:12")
- .field("message", "trying out Elasticsearch")
- .endObject();
+ Map<String, Object> builder = ImmutableMap.<String, Object>builder()
+ .put("user", "kimchy")
+ .put("post_date", "2009-11-15T14:12:12")
+ .put("message", "trying out Elasticsearch")
+ .build();
String indexName = "test_document_operate";
client.forceInsert(indexName, id, builder);
- GetResponse response = client.get(indexName, id);
- Assert.assertEquals("kimchy", response.getSource().get("user"));
- Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+ Optional<Document> response = client.get(indexName, id);
+ Assert.assertEquals("kimchy", response.get().getSource().get("user"));
+ Assert.assertEquals("trying out Elasticsearch", response.get().getSource().get("message"));
- builder = XContentFactory.jsonBuilder().startObject().field("user", "pengys").endObject();
+ builder = ImmutableMap.<String, Object>builder().put("user", "pengys").build();
client.forceUpdate(indexName, id, builder);
response = client.get(indexName, id);
- Assert.assertEquals("pengys", response.getSource().get("user"));
- Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+ Assert.assertEquals("pengys", response.get().getSource().get("user"));
+ Assert.assertEquals("trying out Elasticsearch", response.get().getSource().get("message"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "pengys"));
@@ -180,7 +180,7 @@ public class ITElasticSearchClient {
Assert.assertTrue(client.isExistsTemplate(indexName));
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject();
+ Map<String, Object> builder = ImmutableMap.of("name", "pengys");
client.forceInsert(indexName + "-2019", "testid", builder);
JsonObject index = getIndex(indexName + "-2019");
LOGGER.info(index.toString());
@@ -235,12 +235,12 @@ public class ITElasticSearchClient {
client.createOrUpdateTemplate(indexName, new HashMap<>(), mapping, 0);
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("name", "pengys").endObject();
+ Map<String, Object> builder = ImmutableMap.of("name", "pengys");
client.forceInsert(timeSeriesIndexName, "testid", builder);
- List<String> indexes = client.retrievalIndexByAliases(indexName);
+ Collection<String> indexes = client.retrievalIndexByAliases(indexName);
Assert.assertEquals(1, indexes.size());
- String index = indexes.get(0);
+ String index = indexes.iterator().next();
Assert.assertTrue(client.deleteByIndexName(index));
Assert.assertFalse(client.isExistsIndex(timeSeriesIndexName));
client.deleteTemplate(indexName);
diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/library-elasticsearch-client/pom.xml
similarity index 64%
copy from oap-server/server-library/pom.xml
copy to oap-server/server-library/library-elasticsearch-client/pom.xml
index acfcd44..41ea10f 100644
--- a/oap-server/server-library/pom.xml
+++ b/oap-server/server-library/library-elasticsearch-client/pom.xml
@@ -17,27 +17,23 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>oap-server</artifactId>
+ <artifactId>server-library</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>server-library</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>library-module</module>
- <module>library-server</module>
- <module>library-util</module>
- <module>library-client</module>
- </modules>
+ <artifactId>library-elasticsearch-client</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>apm-util</artifactId>
+ <groupId>com.linecorp.armeria</groupId>
+ <artifactId>armeria</artifactId>
+ <version>1.10.0</version>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 0000000..b73d642
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch;
+
+import com.linecorp.armeria.client.ClientFactory;
+import com.linecorp.armeria.client.WebClient;
+import com.linecorp.armeria.client.WebClientBuilder;
+import com.linecorp.armeria.client.endpoint.EndpointGroup;
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpStatus;
+import com.linecorp.armeria.common.SessionProtocol;
+import com.linecorp.armeria.common.auth.BasicToken;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.library.elasticsearch.client.AliasClient;
+import org.apache.skywalking.library.elasticsearch.client.DocumentClient;
+import org.apache.skywalking.library.elasticsearch.client.IndexClient;
+import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.response.NodeInfo;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+@Slf4j
+public final class ElasticSearchClient implements AutoCloseable {
+ private final WebClient client;
+
+ private final ClientFactory clientFactory;
+
+ private final CompletableFuture<RequestFactory> requestFactory;
+
+ private final TemplateClient templateClient;
+
+ private final IndexClient indexClient;
+
+ private final DocumentClient documentClient;
+
+ private final AliasClient aliasClient;
+
+ ElasticSearchClient(SessionProtocol protocol,
+ String username, String password,
+ EndpointGroup endpointGroup,
+ ClientFactory clientFactory) {
+ this.clientFactory = clientFactory;
+ this.requestFactory = new CompletableFuture<>();
+
+ final WebClientBuilder builder = WebClient.builder(protocol, endpointGroup);
+ if (StringUtil.isNotBlank(username) && StringUtil.isNotBlank(password)) {
+ builder.auth(BasicToken.of(username, password));
+ }
+ builder.factory(clientFactory);
+ client = builder.build();
+
+ templateClient = new TemplateClient(requestFactory, client);
+ documentClient = new DocumentClient(requestFactory, client);
+ indexClient = new IndexClient(requestFactory, client);
+ aliasClient = new AliasClient(requestFactory, client);
+ }
+
+ public static ElasticSearchClientBuilder builder() {
+ return new ElasticSearchClientBuilder();
+ }
+
+ public void connect() {
+ client.get("/").aggregate().thenAcceptAsync(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to connect to ElasticSearch server: " + status);
+ }
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ final NodeInfo node = JsonSerializer.MAPPER.readValue(is, NodeInfo.class);
+ final String v = node.getVersion().getNumber();
+ final ElasticSearchVersion version = ElasticSearchVersion.from(v);
+ requestFactory.complete(RequestFactory.of(version));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).exceptionally(throwable -> {
+ log.error("Failed to determine ElasticSearch version", throwable);
+ requestFactory.completeExceptionally(throwable);
+ return null;
+ }).join();
+ }
+
+ public TemplateClient templates() {
+ return templateClient;
+ }
+
+ public DocumentClient documents() {
+ return documentClient;
+ }
+
+ public IndexClient index() {
+ return indexClient;
+ }
+
+ public AliasClient alias() {
+ return aliasClient;
+ }
+
+ @Override
+ public void close() {
+ clientFactory.close();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java
new file mode 100644
index 0000000..a9500c2
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch;
+
+import com.google.common.collect.ImmutableList;
+import com.linecorp.armeria.client.ClientFactory;
+import com.linecorp.armeria.client.ClientFactoryBuilder;
+import com.linecorp.armeria.client.Endpoint;
+import com.linecorp.armeria.client.endpoint.EndpointGroup;
+import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
+import com.linecorp.armeria.common.SessionProtocol;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.net.ssl.TrustManagerFactory;
+import lombok.SneakyThrows;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
+
+public final class ElasticSearchClientBuilder {
+ private SessionProtocol protocol = SessionProtocol.HTTP;
+
+ private String username;
+
+ private String password;
+
+ private Duration healthCheckRetryInterval = Duration.ofSeconds(30);
+
+ private final ImmutableList.Builder<String> endpoints = ImmutableList.builder();
+
+ private String trustStorePath;
+
+ private String trustStorePass;
+
+ private Duration connectTimeout = Duration.ofMillis(500);
+
+ public ElasticSearchClientBuilder protocol(String protocol) {
+ checkArgument(isNotBlank(protocol), "protocol cannot be blank");
+ this.protocol = SessionProtocol.of(protocol);
+ return this;
+ }
+
+ public ElasticSearchClientBuilder username(String username) {
+ this.username = requireNonNull(username, "username");
+ return this;
+ }
+
+ public ElasticSearchClientBuilder password(String password) {
+ this.password = requireNonNull(password, "password");
+ return this;
+ }
+
+ public ElasticSearchClientBuilder endpoints(Iterable<String> endpoints) {
+ requireNonNull(endpoints, "endpoints");
+ this.endpoints.addAll(endpoints);
+ return this;
+ }
+
+ public ElasticSearchClientBuilder endpoints(String... endpoints) {
+ return endpoints(Arrays.asList(endpoints));
+ }
+
+ public ElasticSearchClientBuilder healthCheckRetryInterval(Duration healthCheckRetryInterval) {
+ requireNonNull(healthCheckRetryInterval, "healthCheckRetryInterval");
+ this.healthCheckRetryInterval = healthCheckRetryInterval;
+ return this;
+ }
+
+ public ElasticSearchClientBuilder trustStorePath(String trustStorePath) {
+ requireNonNull(trustStorePath, "trustStorePath");
+ this.trustStorePath = trustStorePath;
+ return this;
+ }
+
+ public ElasticSearchClientBuilder trustStorePass(String trustStorePass) {
+ requireNonNull(trustStorePass, "trustStorePass");
+ this.trustStorePass = trustStorePass;
+ return this;
+ }
+
+ public ElasticSearchClientBuilder connectTimeout(int connectTimeout) {
+ checkArgument(connectTimeout > 0, "connectTimeout must be positive");
+ this.connectTimeout = Duration.ofMillis(connectTimeout);
+ return this;
+ }
+
+ @SneakyThrows
+ public ElasticSearchClient build() {
+ final List<Endpoint> endpoints =
+ this.endpoints.build().stream().filter(StringUtil::isNotBlank).map(it -> {
+ final String[] parts = it.split(":", 2);
+ if (parts.length == 2) {
+ return Endpoint.of(parts[0], Integer.parseInt(parts[1]));
+ }
+ return Endpoint.of(parts[0]);
+ }).collect(Collectors.toList());
+ final HealthCheckedEndpointGroup endpointGroup =
+ HealthCheckedEndpointGroup.builder(EndpointGroup.of(endpoints), "_cluster/health")
+ .protocol(protocol)
+ .useGet(true)
+ .retryInterval(healthCheckRetryInterval)
+ .build();
+ final ClientFactoryBuilder factoryBuilder = ClientFactory.builder();
+ factoryBuilder.connectTimeout(connectTimeout);
+
+ if (StringUtil.isNotBlank(trustStorePath)) {
+ final TrustManagerFactory trustManagerFactory =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ final KeyStore truststore = KeyStore.getInstance("jks");
+ try (final InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
+ truststore.load(is, trustStorePass.toCharArray());
+ }
+ trustManagerFactory.init(truststore);
+
+ factoryBuilder.tlsCustomizer(
+ sslContextBuilder -> sslContextBuilder.trustManager(trustManagerFactory));
+ }
+
+ return new ElasticSearchClient(
+ protocol,
+ username,
+ password,
+ endpointGroup,
+ factoryBuilder.build()
+ );
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java
new file mode 100644
index 0000000..777dc5c
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchVersion.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch;
+
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public final class ElasticSearchVersion implements Comparable<ElasticSearchVersion> {
+ public static final ElasticSearchVersion UNKNOWN = new ElasticSearchVersion(-1, -1);
+
+ public static final ElasticSearchVersion V6_0 = new ElasticSearchVersion(6, 0);
+
+ public static final ElasticSearchVersion V7_0 = new ElasticSearchVersion(7, 0);
+
+ public static final ElasticSearchVersion V8_0 = new ElasticSearchVersion(8, 0);
+
+ private final int major;
+
+ private final int minor;
+
+ @Override
+ public int compareTo(final ElasticSearchVersion o) {
+ if (major != o.major) {
+ return Integer.compare(major, o.major);
+ }
+ return Integer.compare(minor, o.minor);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ElasticSearchVersion that = (ElasticSearchVersion) o;
+ return major == that.major && minor == that.minor;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(major, minor);
+ }
+
+ @Override
+ public String toString() {
+ return major + "." + minor;
+ }
+
+ private static final Pattern REGEX = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+ public static ElasticSearchVersion from(String version) {
+ final Matcher matcher = REGEX.matcher(version);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Failed to parse version: " + version);
+ }
+ final int major = Integer.parseInt(matcher.group(1));
+ final int minor = Integer.parseInt(matcher.group(2));
+ return new ElasticSearchVersion(major, minor);
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java
new file mode 100644
index 0000000..8b125f0
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/AliasClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.client;
+
+import com.linecorp.armeria.client.WebClient;
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpStatus;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class AliasClient {
+ private final CompletableFuture<RequestFactory> requestFactory;
+
+ private final WebClient client;
+
+ @SneakyThrows
+ public Map<String, Object> indices(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.alias().indices(name))
+ .aggregate().thenApply(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to get alias indices: " + status);
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return JsonSerializer.parse(is);
+ } catch (IOException e) {
+ log.error("Failed to close input stream", e);
+ return Collections.<String, Object>emptyMap();
+ }
+
+ }).exceptionally(e -> {
+ log.error("Failed to check whether index exists", e);
+ return Collections.emptyMap();
+ })).get();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
new file mode 100644
index 0000000..8f12521
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/DocumentClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.client;
+
+import com.linecorp.armeria.client.WebClient;
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpStatus;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.Documents;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class DocumentClient {
+ private final CompletableFuture<RequestFactory> requestFactory;
+
+ private final WebClient client;
+
+ @SneakyThrows
+ public boolean exists(String index, String type, String id) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.document().exist(index, type, id))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to check whether document exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public Optional<Document> get(String index, String type, String id) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.document().get(index, type, id))
+ .aggregate().thenApply(response -> {
+ if (response.status() != HttpStatus.OK) {
+ return Optional.<Document>empty();
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return Optional.of(JsonSerializer.parse(is, Document.class));
+ } catch (IOException e) {
+ log.error("Failed to close input stream", e);
+ return Optional.<Document>empty();
+ }
+ })).get();
+ }
+
+ @SneakyThrows
+ public Optional<Documents> mget(String index, String type, Iterable<String> ids) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.document().mget(index, type, ids))
+ .aggregate().thenApply(response -> {
+ if (response.status() != HttpStatus.OK) {
+ return Optional.<Documents>empty();
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return Optional.of(JsonSerializer.parse(is, Documents.class));
+ } catch (IOException e) {
+ log.error("Failed to close input stream", e);
+ return Optional.<Documents>empty();
+ }
+ })).get();
+ }
+
+ @SneakyThrows
+ public void index(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params) {
+ requestFactory.thenCompose(
+ rf -> client.execute(rf.document().index(index, type, id, doc, params))
+ .aggregate().thenAccept(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.CREATED && status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to index doc: " + status);
+ }
+ })).join();
+ }
+
+ @SneakyThrows
+ public void update(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params) {
+ requestFactory.thenCompose(
+ rf -> client.execute(rf.document().update(index, type, id, doc, params))
+ .aggregate().thenAccept(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to update doc: " + status);
+ }
+ })).join();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java
new file mode 100644
index 0000000..c7ed047
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/IndexClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.client;
+
+import com.linecorp.armeria.client.WebClient;
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpStatus;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class IndexClient {
+ private final CompletableFuture<RequestFactory> requestFactory;
+
+ private final WebClient client;
+
+ @SneakyThrows
+ public boolean exists(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.index().exists(name))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to check whether index exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public Map<String, Object> get(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.index().get(name))
+ .aggregate().thenApply(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to get index: " + status);
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return JsonSerializer.parse(is);
+ } catch (IOException e) {
+ log.error("Failed to close input stream", e);
+ return Collections.<String, Object>emptyMap();
+ }
+ }).exceptionally(e -> {
+ log.error("Failed to check whether index exists", e);
+ return Collections.emptyMap();
+ })).get();
+ }
+
+ @SneakyThrows
+ public boolean create(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.index().create(name))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to check whether index exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public boolean delete(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.index().delete(name))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to delete whether index exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public boolean putMapping(String name, String type,
+ Map<String, Object> mapping) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.index().putMapping(name, type, mapping))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to update index mapping", e);
+ return false;
+ })).get();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java
new file mode 100644
index 0000000..4f5879a
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/client/TemplateClient.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.client;
+
+import com.linecorp.armeria.client.WebClient;
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpStatus;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class TemplateClient {
+ private final CompletableFuture<RequestFactory> requestFactory;
+
+ private final WebClient client;
+
+ @SneakyThrows
+ public boolean exists(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.template().exists(name))
+ .aggregate().thenApply(response -> {
+ final HttpStatus status = response.status();
+ if (status == HttpStatus.OK) {
+ return true;
+ } else if (status == HttpStatus.NOT_FOUND) {
+ return false;
+ }
+ throw new RuntimeException(
+ "Response status code of template exists request should be 200 or 404," +
+ " but it was: " + status.code());
+ }).exceptionally(e -> {
+ log.error("Failed to check whether template exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public Map<String, Object> get(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.template().get(name))
+ .aggregate().thenApply(response -> {
+ final HttpStatus status = response.status();
+ if (status != HttpStatus.OK) {
+ throw new RuntimeException("Failed to get template: " + status);
+ }
+
+ try (final HttpData content = response.content();
+ final InputStream is = content.toInputStream()) {
+ return JsonSerializer.parse(is);
+ } catch (IOException e) {
+ log.error("Failed to close input stream", e);
+ return Collections.<String, Object>emptyMap();
+ }
+ }).exceptionally(e -> {
+ log.error("Failed to check whether template exists", e);
+ return Collections.emptyMap();
+ })).get();
+ }
+
+ @SneakyThrows
+ public boolean delete(String name) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.template().delete(name))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to check whether template exists", e);
+ return false;
+ })).get();
+ }
+
+ @SneakyThrows
+ public boolean createOrUpdate(String name, Map<String, Object> settings,
+ Map<String, Object> mapping, int order) {
+ return requestFactory.thenCompose(
+ rf -> client.execute(rf.template().createOrUpdate(name, settings, mapping, order))
+ .aggregate().thenApply(response -> response.status() == HttpStatus.OK)
+ .exceptionally(e -> {
+ log.error("Failed to create or update template", e);
+ return false;
+ })).get();
+ }
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java
similarity index 62%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java
index ebad19e..6ccf6b8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/AliasFactory.java
@@ -13,27 +13,15 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.requests.factory;
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
- /**
- * @return string representing this object.
- */
- String toStorageData();
-
- /**
- * Initialize this object based on the given string data.
- */
- void toObject(String data);
+import com.linecorp.armeria.common.HttpRequest;
+public interface AliasFactory {
/**
- * Initialize the object based on the given source.
+ * Returns a request to list all indices behind the {@code alias}.
*/
- void copyFrom(T source);
+ HttpRequest indices(String alias);
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java
new file mode 100644
index 0000000..715edf1
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/DocumentFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory;
+
+import com.linecorp.armeria.common.HttpRequest;
+import java.util.Map;
+
+public interface DocumentFactory {
+ /**
+ * Returns a request to check whether the document exists in the {@code index} or not.
+ */
+ HttpRequest exist(String index, String type, String id);
+
+ /**
+ * Returns a request to get a document of {@code id} in {@code index}.
+ */
+ HttpRequest get(String index, String type, String id);
+
+ /**
+ * Returns a request to get multiple documents of {@code ids} in {@code index}.
+ */
+ HttpRequest mget(String index, String type, Iterable<String> ids);
+
+ /**
+ * Returns a request to index a document of {@code id} in {@code index}, with content {@code
+ * doc}.
+ */
+ HttpRequest index(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params);
+
+ /**
+ * Returns a request to update a document of {@code id} in {@code index}, with content {@code
+ * doc}.
+ */
+ HttpRequest update(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java
similarity index 50%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java
index ebad19e..35e9567 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/IndexFactory.java
@@ -13,27 +13,37 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.requests.factory;
+
+import com.linecorp.armeria.common.HttpRequest;
+import java.util.Map;
+
+public interface IndexFactory {
+ /**
+ * Returns a request to check whether the {@code index} exists or not.
+ */
+ HttpRequest exists(String index);
+
+ /**
+ * Returns a request to get an index of name {@code index}.
+ */
+ HttpRequest get(String index);
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
/**
- * @return string representing this object.
+ * Returns a request to create an index of name {@code index}.
*/
- String toStorageData();
+ HttpRequest create(String index);
/**
- * Initialize this object based on the given string data.
+ * Returns a request to delete an index of name {@code index}.
*/
- void toObject(String data);
+ HttpRequest delete(String index);
/**
- * Initialize the object based on the given source.
+ * Returns a request to update the {@code mapping} of an index of name {@code index}.
*/
- void copyFrom(T source);
+ HttpRequest putMapping(String index, String type,
+ Map<String, Object> mapping);
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java
new file mode 100644
index 0000000..c00812a
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/RequestFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory;
+
+import org.apache.skywalking.library.elasticsearch.ElasticSearchVersion;
+import org.apache.skywalking.library.elasticsearch.requests.factory.v6.V6RequestFactory;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V6_0;
+import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V7_0;
+import static org.apache.skywalking.library.elasticsearch.ElasticSearchVersion.V8_0;
+
+public interface RequestFactory {
+ /**
+ * Returns a {@link RequestFactory} that is responsible to compose correct requests according to
+ * the syntax of specific {@link ElasticSearchVersion}.
+ */
+ static RequestFactory of(ElasticSearchVersion version) {
+ requireNonNull(version, "version");
+ if (version.compareTo(V6_0) >= 0 && version.compareTo(V7_0) < 0) {
+ return V6RequestFactory.INSTANCE;
+ }
+ if (version.compareTo(V7_0) >= 0 && version.compareTo(V8_0) < 0) {
+ return V6RequestFactory.INSTANCE;
+ }
+
+ throw new UnsupportedOperationException("Version " + version + " is not supported.");
+ }
+
+ /**
+ * Returns a {@link TemplateFactory} that is dedicated to compose template-related requests.
+ *
+ * @see TemplateFactory
+ */
+ TemplateFactory template();
+
+ /**
+ * Returns a {@link IndexFactory} that is dedicated to compose index-related requests.
+ *
+ * @see IndexFactory
+ */
+ IndexFactory index();
+
+ /**
+ * Returns a {@link AliasFactory} that is dedicated to compose alias-related requests.
+ *
+ * @see AliasFactory
+ */
+ AliasFactory alias();
+
+ /**
+ * Returns a {@link DocumentFactory} that is dedicated to compose document-related requests.
+ *
+ * @see DocumentFactory
+ */
+ DocumentFactory document();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java
similarity index 52%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java
index ebad19e..9726f1f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/TemplateFactory.java
@@ -13,27 +13,33 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.requests.factory;
+
+import com.linecorp.armeria.common.HttpRequest;
+import java.util.Map;
+
+public interface TemplateFactory {
+ /**
+ * Returns a request to check whether the template exists or not.
+ */
+ HttpRequest exists(String name);
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
/**
- * @return string representing this object.
+ * Returns a request to get a template of {@code name}.
*/
- String toStorageData();
+ HttpRequest get(String name);
/**
- * Initialize this object based on the given string data.
+ * Returns a request to delete a template of {@code name}.
*/
- void toObject(String data);
+ HttpRequest delete(String name);
/**
- * Initialize the object based on the given source.
+ * Returns a request to create or update a template of {@code name} with the given {@code
+ * settings}, {@code mapping} and {@code order}.
*/
- void copyFrom(T source);
+ HttpRequest createOrUpdate(String name, Map<String, Object> settings,
+ Map<String, Object> mapping, int order);
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java
similarity index 59%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java
index 2663856..663d27d 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchUpdateRequest.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6AliasFactory.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.library.elasticsearch.requests.factory.v6;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
+import com.linecorp.armeria.common.HttpRequest;
+import org.apache.skywalking.library.elasticsearch.requests.factory.AliasFactory;
-public class ElasticSearchUpdateRequest extends UpdateRequest implements org.apache.skywalking.oap.server.library.client.request.UpdateRequest {
-
- public ElasticSearchUpdateRequest(String index, String type, String id) {
- super(index, type, id);
- }
+class V6AliasFactory implements AliasFactory {
+ static final V6AliasFactory INSTANCE = new V6AliasFactory();
@Override
- public ElasticSearchUpdateRequest doc(XContentBuilder source) {
- super.doc(source);
- return this;
+ public HttpRequest indices(final String alias) {
+ return HttpRequest.builder()
+ .get("/_alias/{name}")
+ .pathParam("name", alias)
+ .build();
}
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java
new file mode 100644
index 0000000..17bba61
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6DocumentFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory.v6;
+
+import com.google.common.collect.ImmutableMap;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.HttpRequestBuilder;
+import com.linecorp.armeria.common.MediaType;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.skywalking.library.elasticsearch.requests.factory.DocumentFactory;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+class V6DocumentFactory implements DocumentFactory {
+ static final V6DocumentFactory INSTANCE = new V6DocumentFactory();
+
+ @Override
+ public HttpRequest exist(final String index, final String type, final String id) {
+ return HttpRequest.builder()
+ .head("/{index}/{type}/{id}")
+ .pathParam("index", index.replaceAll(" ", "%20"))
+ .pathParam("type", type)
+ .pathParam("id", id)
+ .build();
+ }
+
+ @Override
+ public HttpRequest get(String index, String type, String id) {
+ return HttpRequest.builder()
+ .get("/{index}/{type}/{id}")
+ .pathParam("index", index.replaceAll(" ", "%20"))
+ .pathParam("type", type)
+ .pathParam("id", id.replaceAll(" ", "%20"))
+ .build();
+ }
+
+ @SneakyThrows
+ @Override
+ public HttpRequest mget(String index, String type, Iterable<String> ids) {
+ final Map<String, Iterable<String>> m = ImmutableMap.of("ids", ids);
+ final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(m);
+ return HttpRequest.builder()
+ .get("/{index}/{type}/_mget")
+ .pathParam("index", index.replaceAll(" ", "%20"))
+ .pathParam("type", type)
+ .content(MediaType.JSON, content)
+ .build();
+ }
+
+ @SneakyThrows
+ @Override
+ public HttpRequest index(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params) {
+ final HttpRequestBuilder builder = HttpRequest.builder();
+ if (params != null) {
+ params.forEach(builder::queryParam);
+ }
+ final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(doc);
+
+ builder.put("/{index}/{type}/{id}")
+ .pathParam("index", index.replaceAll(" ", "%20"))
+ .pathParam("type", type)
+ .pathParam("id", id.replaceAll(" ", "%20"))
+ .content(MediaType.JSON, content);
+
+ return builder.build();
+ }
+
+ @SneakyThrows
+ @Override
+ public HttpRequest update(String index, String type, String id,
+ Map<String, Object> doc,
+ Map<String, Object> params) {
+ final HttpRequestBuilder builder = HttpRequest.builder();
+ if (params != null) {
+ params.forEach(builder::queryParam);
+ }
+ final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(doc);
+
+ builder.put("/{index}/{type}/{id}/_update")
+ .pathParam("index", index.replaceAll(" ", "%20"))
+ .pathParam("type", type)
+ .pathParam("id", id.replaceAll(" ", "%20"))
+ .content(MediaType.JSON, content);
+
+ return builder.build();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java
new file mode 100644
index 0000000..a356dc9
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6IndexFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory.v6;
+
+import com.google.common.base.Strings;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.MediaType;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.skywalking.library.elasticsearch.requests.factory.IndexFactory;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+class V6IndexFactory implements IndexFactory {
+ static final IndexFactory INSTANCE = new V6IndexFactory();
+
+ @Override
+ public HttpRequest exists(String index) {
+ checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty");
+
+ return HttpRequest.builder()
+ .head("/{index}")
+ .pathParam("index", index)
+ .build();
+ }
+
+ @Override
+ public HttpRequest get(final String index) {
+ checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty");
+
+ return HttpRequest.builder()
+ .get("/{index}")
+ .pathParam("index", index)
+ .build();
+ }
+
+ @SneakyThrows
+ @Override
+ public HttpRequest create(String index) {
+ checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty");
+
+ return HttpRequest.builder()
+ .put("/{index}")
+ .pathParam("index", index)
+ .build();
+ }
+
+ @Override
+ public HttpRequest delete(String index) {
+ checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty");
+
+ return HttpRequest.builder()
+ .delete("/{index}")
+ .pathParam("index", index)
+ .build();
+ }
+
+ @SneakyThrows
+ @Override
+ @SuppressWarnings("unchecked")
+ public HttpRequest putMapping(String index, String type,
+ Map<String, Object> mapping) {
+ checkArgument(!Strings.isNullOrEmpty(index), "index cannot be null or empty");
+ checkArgument(!Strings.isNullOrEmpty(type), "type cannot be null or empty");
+
+ final Map<String, Object> properties = (Map<String, Object>) mapping.get(type);
+ final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(properties);
+ return HttpRequest.builder()
+ .put("/{index}/_mapping/{type}")
+ .pathParam("index", index)
+ .pathParam("type", type)
+ .content(MediaType.JSON, content)
+ .build();
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java
new file mode 100644
index 0000000..2356ae9
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6RequestFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory.v6;
+
+import org.apache.skywalking.library.elasticsearch.requests.factory.AliasFactory;
+import org.apache.skywalking.library.elasticsearch.requests.factory.DocumentFactory;
+import org.apache.skywalking.library.elasticsearch.requests.factory.IndexFactory;
+import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.library.elasticsearch.requests.factory.TemplateFactory;
+
+public final class V6RequestFactory implements RequestFactory {
+ public static final V6RequestFactory INSTANCE = new V6RequestFactory();
+
+ @Override
+ public TemplateFactory template() {
+ return V6TemplateFactory.INSTANCE;
+ }
+
+ @Override
+ public IndexFactory index() {
+ return V6IndexFactory.INSTANCE;
+ }
+
+ @Override
+ public AliasFactory alias() {
+ return V6AliasFactory.INSTANCE;
+ }
+
+ @Override
+ public DocumentFactory document() {
+ return V6DocumentFactory.INSTANCE;
+ }
+}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java
new file mode 100644
index 0000000..2777867
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/V6TemplateFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.requests.factory.v6;
+
+import com.google.common.collect.ImmutableMap;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.MediaType;
+import java.util.Collections;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.skywalking.library.elasticsearch.requests.factory.TemplateFactory;
+import org.apache.skywalking.library.elasticsearch.util.JsonSerializer;
+
+class V6TemplateFactory implements TemplateFactory {
+ static final TemplateFactory INSTANCE = new V6TemplateFactory();
+
+ @Override
+ public HttpRequest exists(String name) {
+ return HttpRequest.builder()
+ .get("/_template/{name}")
+ .pathParam("name", name)
+ .build();
+ }
+
+ @Override
+ public HttpRequest get(final String name) {
+ return HttpRequest.builder()
+ .get("/_template/{name}")
+ .pathParam("name", name)
+ .build();
+ }
+
+ @Override
+ public HttpRequest delete(final String name) {
+ return HttpRequest.builder()
+ .delete("/_template/{name}")
+ .pathParam("name", name)
+ .build();
+ }
+
+ @SneakyThrows
+ @Override
+ public HttpRequest createOrUpdate(String name, Map<String, Object> settings,
+ Map<String, Object> mapping, int order) {
+ final String[] patterns = new String[] {name + "-*"};
+ final Map<String, Object> aliases = ImmutableMap.of(name, Collections.emptyMap());
+ final Map<String, Object> template =
+ ImmutableMap.<String, Object>builder()
+ .put("index_patterns", patterns)
+ .put("aliases", aliases)
+ .put("settings", settings)
+ .put("mappings", mapping)
+ .put("order", order)
+ .build();
+
+ final byte[] content = JsonSerializer.MAPPER.writeValueAsBytes(template);
+
+ return HttpRequest.builder()
+ .put("/_template/{name}")
+ .pathParam("name", name)
+ .content(MediaType.JSON, content)
+ .build();
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java
index ebad19e..89e0565 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Document.java
@@ -13,27 +13,21 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.response;
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
- /**
- * @return string representing this object.
- */
- String toStorageData();
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import lombok.Data;
+
+@Data
+public final class Document {
+ private boolean found;
- /**
- * Initialize this object based on the given string data.
- */
- void toObject(String data);
+ @JsonProperty("_id")
+ private String id;
- /**
- * Initialize the object based on the given source.
- */
- void copyFrom(T source);
+ @JsonProperty("_source")
+ private Map<String, Object> source;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java
index ebad19e..6084cab 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/Documents.java
@@ -13,27 +13,20 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.response;
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
- /**
- * @return string representing this object.
- */
- String toStorageData();
+import java.util.Iterator;
+import java.util.List;
+import lombok.Data;
- /**
- * Initialize this object based on the given string data.
- */
- void toObject(String data);
+@Data
+public final class Documents implements Iterable<Document> {
+ private List<Document> docs;
- /**
- * Initialize the object based on the given source.
- */
- void copyFrom(T source);
+ @Override
+ public Iterator<Document> iterator() {
+ return docs.stream().filter(Document::isFound).iterator();
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java
index ebad19e..3e1ea2b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/response/NodeInfo.java
@@ -13,27 +13,18 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch.response;
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
- /**
- * @return string representing this object.
- */
- String toStorageData();
+import lombok.Data;
- /**
- * Initialize this object based on the given string data.
- */
- void toObject(String data);
+@Data
+public final class NodeInfo {
+ @Data
+ public static class Version {
+ private String number;
+ }
- /**
- * Initialize the object based on the given source.
- */
- void copyFrom(T source);
+ private Version version;
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java
new file mode 100644
index 0000000..3ecbd07
--- /dev/null
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/util/JsonSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.library.elasticsearch.util;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.InputStream;
+import java.util.Map;
+import lombok.SneakyThrows;
+
+public class JsonSerializer {
+ public static final ObjectMapper MAPPER = new ObjectMapper()
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ private static final TypeReference<Map<String, Object>> MAP_TYPE =
+ new TypeReference<Map<String, Object>>() {
+ };
+
+ @SneakyThrows
+ public static <T> T parse(InputStream is, Class<T> type) {
+ return MAPPER.readValue(is, type);
+ }
+
+ @SneakyThrows
+ public static Map<String, Object> parse(InputStream is) {
+ return MAPPER.readValue(is, MAP_TYPE);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java
similarity index 61%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
copy to oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java
index ebad19e..f195ec5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/type/StorageDataComplexObject.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/test/java/org/apache/skywalking/library/elasticsearch/ElasticSearchClientTest.java
@@ -13,27 +13,20 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.type;
+package org.apache.skywalking.library.elasticsearch;
-/**
- * StorageDataComplexObject implementation supports String-Object interconversion.
- */
-public interface StorageDataComplexObject<T> {
- /**
- * @return string representing this object.
- */
- String toStorageData();
+import org.junit.Test;
- /**
- * Initialize this object based on the given string data.
- */
- void toObject(String data);
+public class ElasticSearchClientTest {
- /**
- * Initialize the object based on the given source.
- */
- void copyFrom(T source);
+ @Test
+ public void testPing() {
+ final ElasticSearchClient client =
+ ElasticSearchClient.builder()
+ .endpoints("localhost:9200")
+ .build();
+ client.connect();
+ }
}
diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/pom.xml
index acfcd44..fa37d28 100644
--- a/oap-server/server-library/pom.xml
+++ b/oap-server/server-library/pom.xml
@@ -32,6 +32,7 @@
<module>library-server</module>
<module>library-util</module>
<module>library-client</module>
+ <module>library-elasticsearch-client</module>
</modules>
<dependencies>
@@ -40,4 +41,4 @@
<artifactId>apm-util</artifactId>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index f586b6f..e798720 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -33,6 +33,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
super(client);
}
+ // TODO: remove
protected XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, Object> entries: objectMap.entrySet()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index af73fc9..5f687a1 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
@@ -53,7 +54,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
}
deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd"));
String tableName = IndexController.INSTANCE.getTableName(model);
- List<String> indexes = client.retrievalIndexByAliases(tableName);
+ Collection<String> indexes = client.retrievalIndexByAliases(tableName);
List<String> prepareDeleteIndexes = new ArrayList<>();
List<String> leftIndices = new ArrayList<>();
@@ -95,7 +96,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
}
}
String tableName = IndexController.INSTANCE.getTableName(model);
- List<String> indexes;
+ Collection<String> indexes;
try {
indexes = client.retrievalIndexByAliases(tableName);
} catch (IOException e) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
index 2c8676c..abc67c6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
@@ -40,12 +41,13 @@ public class ManagementEsDAO extends EsDAO implements IManagementDAO {
public void insert(Model model, ManagementData managementData) throws IOException {
String tableName = IndexController.INSTANCE.getTableName(model);
String docId = IndexController.INSTANCE.generateDocId(model, managementData.id());
- final GetResponse response = getClient().get(tableName, docId);
- if (response.isExists()) {
+ final boolean exist = getClient().existDoc(tableName, docId);
+ if (exist) {
return;
}
- XContentBuilder builder = map2builder(
- IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(managementData)));
- getClient().forceInsert(tableName, docId, builder);
+ Map<String, Object> source =
+ IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(
+ managementData));
+ getClient().forceInsert(tableName, docId, source);
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 38d379d..e8ec5a3 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -23,8 +23,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -90,15 +93,12 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
String[] ids = metricList.stream()
.map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
.toArray(String[]::new);
- try {
- SearchResponse response = getClient().ids(tableName, ids);
- for (int i = 0; i < response.getHits().getHits().length; i++) {
- Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap());
+ getClient().ids(tableName, ids).ifPresent(documents -> {
+ for (final Document doc : documents) {
+ Metrics source = storageBuilder.storage2Entity(doc.getSource());
result.add(source);
}
- } catch (IOException e) {
- log.error("multiGet id=" + Arrays.toString(ids) + " from " + tableName + " fails.", e);
- }
+ });
});
return result;
@@ -106,8 +106,8 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
- XContentBuilder builder = map2builder(
- IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)));
+ Map<String, Object> builder =
+ IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics));
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareInsert(modelName, id, builder);
@@ -115,8 +115,8 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
- XContentBuilder builder = map2builder(
- IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics)));
+ Map<String, Object> builder =
+ IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(metrics));
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareUpdate(modelName, id, builder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
index 3d89243..bab0404 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
@@ -40,8 +41,9 @@ public class NoneStreamEsDAO extends EsDAO implements INoneStreamDAO {
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
- XContentBuilder builder = map2builder(
- IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(noneStream)));
+ Map<String, Object> builder =
+ IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(
+ noneStream));
String modelName = TimeSeriesUtils.writeIndexName(model, noneStream.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, noneStream.id());
getClient().forceInsert(modelName, id, builder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index f23bc61..26f35c7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -19,13 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
public class RecordEsDAO extends EsDAO implements IRecordDAO {
private final StorageHashMapBuilder<Record> storageBuilder;
@@ -38,8 +38,8 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- XContentBuilder builder = map2builder(
- IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
+ Map<String, Object> builder =
+ IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record));
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, record.id());
return getClient().prepareInsert(modelName, id, builder);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 33d963f..187b9fd 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -20,9 +20,13 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -43,7 +47,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
-import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
@@ -111,12 +114,15 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
}
ids.add(id);
});
+ MetricsValues metricsValues = new MetricsValues();
- SearchResponse response = getClient()
- .ids(tableName, ids.toArray(new String[0]));
- Map<String, Map<String, Object>> idMap = toMap(response);
+ Optional<Documents> response = getClient().ids(tableName, ids);
+ if (!response.isPresent()) {
+ return metricsValues;
+ }
+
+ Map<String, Map<String, Object>> idMap = toMap(response.get());
- MetricsValues metricsValues = new MetricsValues();
// Label is null, because in readMetricsValues, no label parameter.
IntValues intValues = metricsValues.getValues();
for (String id : ids) {
@@ -156,11 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
ids.add(id);
});
- SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0]));
+ Optional<Documents> response = getClient().ids(tableName, ids);
+ if (!response.isPresent()) {
+ return Collections.emptyList();
+ }
Map<String, DataTable> idMap = new HashMap<>();
- SearchHit[] hits = response.getHits().getHits();
- for (SearchHit hit : hits) {
- idMap.put(hit.getId(), new DataTable((String) hit.getSourceAsMap().getOrDefault(valueColumnName, "")));
+ for (final Document document : response.get()) {
+ idMap.put(document.getId(), new DataTable((String) document.getSource().getOrDefault(valueColumnName, "")));
}
return Util.composeLabelValue(condition, labels, ids, idMap);
}
@@ -181,11 +189,14 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
ids.add(id);
});
- SearchResponse response = getClient().ids(tableName, ids.toArray(new String[0]));
- Map<String, Map<String, Object>> idMap = toMap(response);
-
HeatMap heatMap = new HeatMap();
+ Optional<Documents> response = getClient().ids(tableName, ids);
+ if (!response.isPresent()) {
+ return heatMap;
+ }
+ Map<String, Map<String, Object>> idMap = toMap(response.get());
+
final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
for (String id : ids) {
Map<String, Object> source = idMap.get(id);
@@ -254,11 +265,10 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
sourceBuilder.size(0);
}
- private Map<String, Map<String, Object>> toMap(SearchResponse response) {
+ private Map<String, Map<String, Object>> toMap(Documents documents) {
Map<String, Map<String, Object>> result = new HashMap<>();
- SearchHit[] hits = response.getHits().getHits();
- for (SearchHit hit : hits) {
- result.put(hit.getId(), hit.getSourceAsMap());
+ for (final Document document : documents) {
+ result.put(document.getId(), document.getSource());
}
return result;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
index 809320c..9de38b8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
@@ -32,7 +34,6 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
-import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -83,12 +84,12 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
final UITemplate.Builder builder = new UITemplate.Builder();
final UITemplate uiTemplate = setting.toEntity();
- final GetResponse response = getClient().get(UITemplate.INDEX_NAME, uiTemplate.id());
- if (response.isExists()) {
+ final boolean exist = getClient().existDoc(UITemplate.INDEX_NAME, uiTemplate.id());
+ if (exist) {
return TemplateChangeStatus.builder().status(false).message("Template exists").build();
}
- XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate));
+ Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate);
getClient().forceInsert(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder);
return TemplateChangeStatus.builder().status(true).build();
} catch (IOException e) {
@@ -103,12 +104,12 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
final UITemplate.Builder builder = new UITemplate.Builder();
final UITemplate uiTemplate = setting.toEntity();
- final GetResponse response = getClient().get(UITemplate.INDEX_NAME, uiTemplate.id());
- if (!response.isExists()) {
+ final boolean exist = getClient().existDoc(UITemplate.INDEX_NAME, uiTemplate.id());
+ if (!exist) {
return TemplateChangeStatus.builder().status(false).message("Can't find the template").build();
}
- XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate));
+ Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate);
getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder);
return TemplateChangeStatus.builder().status(true).build();
} catch (IOException e) {
@@ -119,13 +120,13 @@ public class UITemplateManagementEsDAO extends EsDAO implements UITemplateManage
@Override
public TemplateChangeStatus disableTemplate(final String name) throws IOException {
- final GetResponse response = getClient().get(UITemplate.INDEX_NAME, name);
- if (response.isExists()) {
+ final Optional<Document> response = getClient().get(UITemplate.INDEX_NAME, name);
+ if (response.isPresent()) {
final UITemplate.Builder builder = new UITemplate.Builder();
- final UITemplate uiTemplate = builder.storage2Entity(response.getSourceAsMap());
+ final UITemplate uiTemplate = builder.storage2Entity(response.get().getSource());
uiTemplate.setDisabled(BooleanUtils.TRUE);
- XContentBuilder xContentBuilder = map2builder(builder.entity2Storage(uiTemplate));
+ Map<String, Object> xContentBuilder = builder.entity2Storage(uiTemplate);
getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), xContentBuilder);
return TemplateChangeStatus.builder().status(true).build();
} else {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
index f8b68ca..596d240 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
@@ -38,6 +38,8 @@ import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.library.elasticsearch.response.Document;
+import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -49,14 +51,11 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -124,18 +123,6 @@ public class ElasticSearch7Client extends ElasticSearchClient {
return response.isAcknowledged();
}
- @Override
- public boolean createIndex(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
- indexName = formatIndexName(indexName);
- CreateIndexRequest request = new CreateIndexRequest(indexName);
- request.settings(settings);
- request.mapping(mapping);
- CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
- log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
- return response.isAcknowledged();
- }
-
/**
* {@inheritDoc}
*/
@@ -310,7 +297,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
@Override
- public GetResponse get(String indexName, String id) throws IOException {
+ public Optional<Document> get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, id);
try {
@@ -324,7 +311,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
@Override
- public SearchResponse ids(String indexName, String[] ids) throws IOException {
+ public Optional<Documents> ids(String indexName, String[] ids) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
@@ -393,25 +380,6 @@ public class ElasticSearch7Client extends ElasticSearchClient {
return HttpStatus.SC_OK;
}
- /**
- * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future.
- */
- @Deprecated
- @Override
- public void synchronousBulk(BulkRequest request) {
- request.timeout(TimeValue.timeValueMinutes(2));
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
- request.waitForActiveShards(ActiveShardCount.ONE);
- try {
- int size = request.requests().size();
- BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
- log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
- healthChecker.health();
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- }
- }
-
@Override
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();