You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/11 12:08:59 UTC
[incubator-skywalking] branch master updated: Provide elasticsearch
template operation method. (#2474)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 73e8853 Provide elasticsearch template operation method. (#2474)
73e8853 is described below
commit 73e8853fa51fc12a94fc701dad131d5c101e6081
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Apr 11 20:08:51 2019 +0800
Provide elasticsearch template operation method. (#2474)
* Provide elasticsearch template operation method.
* Fixed compile failure issue.
---
.../core/analysis/indicator/CPMIndicator.java | 4 -
.../core/analysis/indicator/CountIndicator.java | 4 -
.../analysis/indicator/DoubleAvgIndicator.java | 4 -
.../core/analysis/indicator/IndicatorMetaInfo.java | 8 --
.../core/analysis/indicator/LongAvgIndicator.java | 4 -
.../analysis/indicator/MaxDoubleIndicator.java | 4 -
.../core/analysis/indicator/MaxLongIndicator.java | 11 +-
.../core/analysis/indicator/SumIndicator.java | 4 -
.../oap/server/library/client/Client.java | 8 +-
.../client/elasticsearch/ElasticSearchClient.java | 147 +++++++++++--------
.../elasticsearch/ElasticSearchClientTestCase.java | 62 --------
.../elasticsearch/ITElasticSearchClient.java | 160 +++++++++++++++++++--
.../library-client/src/test/resources/log4j2.xml | 4 +-
.../StorageModuleElasticsearchProvider.java | 3 +-
.../elasticsearch/base/StorageEsInstaller.java | 96 +++++--------
.../elasticsearch/lock/RegisterLockInstaller.java | 36 ++---
16 files changed, 302 insertions(+), 257 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
index 24937e5..023c2cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
@@ -48,9 +48,5 @@ public abstract class CPMIndicator extends Indicator implements LongValueHolder
@Override public void calculate() {
this.value = total / getDurationInMinute();
}
-
- @Override public long getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
index 474f0b5..ea99d42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
@@ -45,8 +45,4 @@ public abstract class CountIndicator extends Indicator implements LongValueHolde
@Override public void calculate() {
}
-
- @Override public long getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
index 9867fa1..9b7f9dc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
@@ -51,8 +51,4 @@ public abstract class DoubleAvgIndicator extends Indicator implements DoubleValu
@Override public final void calculate() {
this.value = this.summation / this.count;
}
-
- @Override public double getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
index 1251791..aa15787 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
@@ -41,14 +41,6 @@ public class IndicatorMetaInfo {
this.id = id;
}
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
@Override public String toString() {
return "IndicatorMetaInfo{" +
"indicatorName='" + indicatorName + '\'' +
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
index fa1df8b..3d4fce8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
@@ -51,8 +51,4 @@ public abstract class LongAvgIndicator extends Indicator implements LongValueHol
@Override public final void calculate() {
this.value = this.summation / this.count;
}
-
- @Override public long getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
index ed1251a..fb5f03f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
@@ -46,8 +46,4 @@ public abstract class MaxDoubleIndicator extends Indicator implements DoubleValu
@Override public void calculate() {
}
-
- @Override public double getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
index db2638a..48dc5b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
@@ -18,11 +18,8 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Entrance;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorFunction;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.SourceFrom;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
@@ -49,8 +46,4 @@ public abstract class MaxLongIndicator extends Indicator implements LongValueHol
@Override public void calculate() {
}
-
- @Override public long getValue() {
- return value;
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
index 8bf7303..89710df 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
@@ -45,8 +45,4 @@ public abstract class SumIndicator extends Indicator implements LongValueHolder
@Override public void calculate() {
}
-
- @Override public long getValue() {
- return value;
- }
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
index 95af9af..69f0f9f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.library.client;
+import java.io.IOException;
+
/**
* @author peng-yongsheng
*/
public interface Client {
-
- void connect() throws ClientException;
- void shutdown();
+ void connect() throws IOException;
+
+ void shutdown() throws IOException;
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 0e65d72..90714f6 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,53 +18,32 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
+import com.google.gson.*;
+import java.io.*;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.*;
+import org.apache.http.auth.*;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.*;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.create.*;
+import org.elasticsearch.action.admin.indices.delete.*;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetRequest;
-import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -73,7 +52,7 @@ public class ElasticSearchClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
- private static final String TYPE = "type";
+ public static final String TYPE = "type";
private final String clusterNodes;
private final String namespace;
private final String user;
@@ -87,32 +66,23 @@ public class ElasticSearchClient implements Client {
this.password = password;
}
- @Override public void connect() {
+ @Override public void connect() throws IOException {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
RestClientBuilder builder;
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
- .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(
- HttpAsyncClientBuilder httpClientBuilder) {
- return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
- });
+ .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
client = new RestHighLevelClient(builder);
+ client.ping();
}
- @Override public void shutdown() {
- try {
- client.close();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
+ @Override public void shutdown() throws IOException {
+ client.close();
}
private List<HttpHost> parseClusterNodes(String nodes) {
@@ -128,17 +98,26 @@ public class ElasticSearchClient implements Client {
return httpHosts;
}
- public boolean createIndex(String indexName, Settings settings,
- XContentBuilder mappingBuilder) throws IOException {
+ public boolean createIndex(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
- request.settings(settings);
- request.mapping(TYPE, mappingBuilder);
+ request.settings(settings.toString(), XContentType.JSON);
+ request.mapping(TYPE, mapping.toString(), XContentType.JSON);
CreateIndexResponse response = client.indices().create(request);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
+ public JsonObject getIndex(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(indexName);
+ Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
+ InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+ Gson gson = new Gson();
+ return gson.fromJson(reader, JsonObject.class);
+ }
+
public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
@@ -155,6 +134,45 @@ public class ElasticSearchClient implements Client {
return client.indices().exists(request);
}
+ public boolean isExistsTemplate(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == 200) {
+ return true;
+ } else if (statusCode == 404) {
+ return false;
+ } else {
+ throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
+ }
+ }
+
+ public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ JsonArray patterns = new JsonArray();
+ patterns.add(indexName + "_*");
+
+ JsonObject template = new JsonObject();
+ template.add("index_patterns", patterns);
+ template.add("settings", settings);
+ template.add("mappings", mapping);
+
+ HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
+
+ Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+ return response.getStatusLine().getStatusCode() == 200;
+ }
+
+ public boolean deleteTemplate(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
+ return response.getStatusLine().getStatusCode() == 200;
+ }
+
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
@@ -218,7 +236,7 @@ public class ElasticSearchClient implements Client {
" }" +
"}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
- Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
+ Response response = client.getLowLevelClient().performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
}
@@ -230,23 +248,26 @@ public class ElasticSearchClient implements Client {
return indexName;
}
- public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
- int concurrentRequests) {
+ public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
-
+ int numberOfActions = request.numberOfActions();
+ logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
- public void afterBulk(long executionId, BulkRequest request,
- BulkResponse response) {
-
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ logger.warn("Bulk [{}] executed with failures", executionId);
+ } else {
+ logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
+ }
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+ logger.error("Failed to execute bulk", failure);
}
};
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
deleted file mode 100644
index de56860..0000000
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
-
-import java.io.IOException;
-import org.apache.skywalking.oap.server.library.client.ClientException;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.*;
-import org.junit.Assert;
-
-/**
- * @author peng-yongsheng
- */
-public class ElasticSearchClientTestCase {
-
- public static void main(String[] args) throws IOException, ClientException {
- Settings settings = Settings.builder()
- .put("number_of_shards", 2)
- .put("number_of_replicas", 0)
- .build();
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject()
- .startObject("_all")
- .field("enabled", false)
- .endObject()
- .startObject("properties")
- .startObject("column1")
- .field("type", "text")
- .endObject()
- .endObject();
- builder.endObject();
-
- ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
- client.connect();
-
- String indexName = "test";
- client.createIndex(indexName, settings, builder);
- Assert.assertTrue(client.isExistsIndex(indexName));
- client.deleteIndex(indexName);
- Assert.assertFalse(client.isExistsIndex(indexName));
-
-
- client.shutdown();
- }
-}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index 7bacf58..4e7d483 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -18,29 +18,167 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
+import com.google.gson.JsonObject;
import java.io.IOException;
-import org.elasticsearch.action.get.GetResponse;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ITElasticSearchClient {
- @Test
- public void test() throws IOException {
- ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
+ private static final Logger logger = LoggerFactory.getLogger(ITElasticSearchClient.class);
+
+ private ElasticSearchClient client;
+
+ @Before
+ public void before() throws IOException {
+ client = new ElasticSearchClient("localhost:9200", "", "test", "test");
client.connect();
+ }
+
+ @After
+ public void after() throws IOException {
+ client.shutdown();
+ }
+
+ @Test
+ public void indexOperate() throws IOException {
+ JsonObject settings = new JsonObject();
+ settings.addProperty("number_of_shards", 2);
+ settings.addProperty("number_of_replicas", 2);
+
+ JsonObject mapping = new JsonObject();
+ mapping.add("_doc", new JsonObject());
+
+ JsonObject doc = mapping.getAsJsonObject("_doc");
+
+ JsonObject properties = new JsonObject();
+ doc.add("properties", properties);
+
+ JsonObject column = new JsonObject();
+ column.addProperty("type", "text");
+ properties.add("column1", column);
+
+ String indexName = "test_index_operate";
+ client.createIndex(indexName, settings, doc);
+ Assert.assertTrue(client.isExistsIndex(indexName));
+
+ JsonObject index = client.getIndex(indexName);
+ logger.info(index.toString());
+
+ Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
+ Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
+
+ Assert.assertEquals("text", index.getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("type").getAsJsonObject("properties").getAsJsonObject("column1").get("type").getAsString());
+
+ Assert.assertTrue(client.deleteIndex(indexName));
+ }
+
+ @Test
+ public void documentOperate() throws IOException {
+ String id = String.valueOf(System.currentTimeMillis());
+
+ XContentBuilder builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .field("user", "kimchy")
+ .field("post_date", "2009-11-15T14:12:12")
+ .field("message", "trying out Elasticsearch")
+ .endObject();
+
+ String indexName = "test_document_operate";
+ client.forceInsert(indexName, id, builder);
+
+ GetResponse response = client.get(indexName, id);
+ Assert.assertEquals("kimchy", response.getSource().get("user"));
+ Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+
+ builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .field("user", "pengys")
+ .endObject();
+ client.forceUpdate(indexName, id, builder);
+
+ response = client.get(indexName, id);
+ Assert.assertEquals("pengys", response.getSource().get("user"));
+ Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+
+ List<String> ids = new ArrayList<>();
+ ids.add(id);
+ MultiGetResponse responses = client.multiGet(indexName, ids);
+ Assert.assertEquals(1, responses.getResponses().length);
+ Assert.assertEquals("pengys", responses.getResponses()[0].getResponse().getSource().get("user"));
+
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ sourceBuilder.query(QueryBuilders.termQuery("user", "pengys"));
+ SearchResponse searchResponse = client.search(indexName, sourceBuilder);
+ Assert.assertEquals("trying out Elasticsearch", searchResponse.getHits().getHits()[0].getSourceAsMap().get("message"));
+ }
+
+ @Test
+ public void templateOperate() throws IOException {
+ JsonObject settings = new JsonObject();
+ settings.addProperty("number_of_shards", 1);
+ settings.addProperty("number_of_replicas", 0);
+ settings.addProperty("index.refresh_interval", "3s");
+ settings.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
+
+ JsonObject mapping = new JsonObject();
+ mapping.add("type", new JsonObject());
+ JsonObject doc = mapping.getAsJsonObject("type");
+
+ JsonObject properties = new JsonObject();
+ doc.add("properties", properties);
+
+ JsonObject column = new JsonObject();
+ column.addProperty("type", "text");
+ properties.add("name", column);
+
+ String indexName = "template_operate";
+
+ client.createTemplate(indexName, settings, mapping);
+
+ Assert.assertTrue(client.isExistsTemplate(indexName));
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
+ .field("name", "pengys")
+ .endObject();
+ client.forceInsert(indexName + "_2019", "testid", builder);
+
+ JsonObject index = client.getIndex(indexName + "_2019");
+ logger.info(index.toString());
+ Assert.assertEquals(1, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
+ Assert.assertEquals(0, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
+
+ client.deleteTemplate(indexName);
+ Assert.assertFalse(client.isExistsTemplate(indexName));
+ }
+
+ @Test
+ public void bulk() throws InterruptedException {
+ BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
- builder.field("key", "value");
- builder.endObject();
- client.forceInsert("test_index", "201904091521", builder);
+ Map<String, String> source = new HashMap<>();
+ source.put("column1", "value1");
+ source.put("column2", "value2");
- GetResponse response = client.get("test_index", "201904091521");
+ for (int i = 0; i < 100; i++) {
+ IndexRequest indexRequest = new IndexRequest("bulk_insert_test", "type", String.valueOf(i));
+ indexRequest.source(source);
+ bulkProcessor.add(indexRequest);
+ }
- Assert.assertTrue(response.getSource().containsKey("key"));
- Assert.assertEquals("value", response.getSource().get("key"));
+ bulkProcessor.flush();
+ bulkProcessor.awaitClose(2, TimeUnit.SECONDS);
}
}
diff --git a/oap-server/server-library/library-client/src/test/resources/log4j2.xml b/oap-server/server-library/library-client/src/test/resources/log4j2.xml
index 6eb5b3f..cd67282 100644
--- a/oap-server/server-library/library-client/src/test/resources/log4j2.xml
+++ b/oap-server/server-library/library-client/src/test/resources/log4j2.xml
@@ -17,14 +17,14 @@
~
-->
-<Configuration status="DEBUG">
+<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
- <Root level="DEBUG">
+ <Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index bff66df..419ed56 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
+import java.io.IOException;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
@@ -98,7 +99,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
lockInstaller.install();
- } catch (StorageException e) {
+ } catch (StorageException | IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 368228e..156bf43 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -18,21 +18,14 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+import com.google.gson.JsonObject;
+import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
-import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -43,13 +36,13 @@ public class StorageEsInstaller extends ModelInstaller {
private final int indexShardsNumber;
private final int indexReplicasNumber;
- private final ColumnTypeEsMapping mapping;
+ private final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
super(moduleManager);
this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber;
- this.mapping = new ColumnTypeEsMapping();
+ this.columnTypeEsMapping = new ColumnTypeEsMapping();
}
@Override protected boolean isExists(Client client, Model tableDefine) throws StorageException {
@@ -80,20 +73,13 @@ public class StorageEsInstaller extends ModelInstaller {
@Override protected void createTable(Client client, Model tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
- // mapping
- XContentBuilder mappingBuilder = null;
-
- Settings settings = createSettingBuilder();
- try {
- mappingBuilder = createMappingBuilder(tableDefine);
- logger.info("index {}'s mapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), Strings.toString(mappingBuilder.prettyPrint()));
- } catch (Exception e) {
- logger.error("create {} index mapping builder error, error message: {}", esClient.formatIndexName(tableDefine.getName()), e.getMessage());
- }
+ JsonObject settings = createSetting();
+ JsonObject mapping = createMapping(tableDefine);
+ logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), mapping.toString());
boolean isAcknowledged;
try {
- isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
+ isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mapping);
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
@@ -104,50 +90,46 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
- private Settings createSettingBuilder() {
- return Settings.builder()
- .put("index.number_of_shards", indexShardsNumber)
- .put("index.number_of_replicas", indexReplicasNumber)
- .put("index.refresh_interval", "3s")
- .put("analysis.analyzer.oap_analyzer.type", "stop")
- .build();
+ private JsonObject createSetting() {
+ JsonObject setting = new JsonObject();
+ setting.addProperty("index.number_of_shards", indexShardsNumber);
+ setting.addProperty("index.number_of_replicas", indexReplicasNumber);
+ setting.addProperty("index.refresh_interval", "3s");
+ setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
+ return setting;
}
- private XContentBuilder createMappingBuilder(Model tableDefine) throws IOException {
- XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
- .startObject()
- .startObject("_all")
- .field("enabled", false)
- .endObject()
- .startObject("properties");
+ private JsonObject createMapping(Model tableDefine) {
+ JsonObject mapping = new JsonObject();
+ mapping.add(ElasticSearchClient.TYPE, new JsonObject());
+
+ JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+
+ JsonObject properties = new JsonObject();
+ type.add("properties", properties);
for (ModelColumn columnDefine : tableDefine.getColumns()) {
if (columnDefine.isMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
- mappingBuilder
- .startObject(columnDefine.getColumnName().getName())
- .field("type", mapping.transform(columnDefine.getType()))
- .field("copy_to", matchCName)
- .endObject()
- .startObject(matchCName)
- .field("type", "text")
- .field("analyzer", "oap_analyzer")
- .endObject();
+ JsonObject originalColumn = new JsonObject();
+ originalColumn.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
+ originalColumn.addProperty("copy_to", matchCName);
+ properties.add(columnDefine.getColumnName().getName(), originalColumn);
+
+ JsonObject matchColumn = new JsonObject();
+ matchColumn.addProperty("type", "text");
+ matchColumn.addProperty("analyzer", "oap_analyzer");
+ properties.add(columnDefine.getColumnName().getName(), matchColumn);
} else {
- mappingBuilder
- .startObject(columnDefine.getColumnName().getName())
- .field("type", mapping.transform(columnDefine.getType()))
- .endObject();
+ JsonObject column = new JsonObject();
+ column.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
+ properties.add(columnDefine.getColumnName().getName(), column);
}
}
- mappingBuilder
- .endObject()
- .endObject();
-
- logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
+ logger.debug("create elasticsearch index: {}", mapping.toString());
- return mappingBuilder;
+ return mapping;
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 72ff4f0..87e233f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
+import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
@@ -68,22 +68,24 @@ public class RegisterLockInstaller {
}
private void createIndex() throws IOException {
- Settings settings = Settings.builder()
- .put("index.number_of_shards", 1)
- .put("index.number_of_replicas", 0)
- .put("index.refresh_interval", "1s")
- .build();
-
- XContentBuilder source = XContentFactory.jsonBuilder()
- .startObject()
- .startObject("properties")
- .startObject(RegisterLockIndex.COLUMN_SEQUENCE)
- .field("type", "integer")
- .endObject()
- .endObject()
- .endObject();
-
- client.createIndex(RegisterLockIndex.NAME, settings, source);
+ JsonObject settings = new JsonObject();
+ settings.addProperty("index.number_of_shards", 1);
+ settings.addProperty("index.number_of_replicas", 0);
+ settings.addProperty("index.refresh_interval", "1s");
+
+ JsonObject mapping = new JsonObject();
+ mapping.add(ElasticSearchClient.TYPE, new JsonObject());
+
+ JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+
+ JsonObject properties = new JsonObject();
+ type.add("properties", properties);
+
+ JsonObject column = new JsonObject();
+ column.addProperty("type", "integer");
+ properties.add(RegisterLockIndex.COLUMN_SEQUENCE, column);
+
+ client.createIndex(RegisterLockIndex.NAME, settings, mapping);
}
private void putIfAbsent(int scopeId) throws IOException {