You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/04/11 12:08:59 UTC

[incubator-skywalking] branch master updated: Provide elasticsearch template operation method. (#2474)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e8853  Provide elasticsearch template operation method. (#2474)
73e8853 is described below

commit 73e8853fa51fc12a94fc701dad131d5c101e6081
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Apr 11 20:08:51 2019 +0800

    Provide elasticsearch template operation method. (#2474)
    
    * Provide elasticsearch template operation method.
    
    * Fixed compile failure issue.
---
 .../core/analysis/indicator/CPMIndicator.java      |   4 -
 .../core/analysis/indicator/CountIndicator.java    |   4 -
 .../analysis/indicator/DoubleAvgIndicator.java     |   4 -
 .../core/analysis/indicator/IndicatorMetaInfo.java |   8 --
 .../core/analysis/indicator/LongAvgIndicator.java  |   4 -
 .../analysis/indicator/MaxDoubleIndicator.java     |   4 -
 .../core/analysis/indicator/MaxLongIndicator.java  |  11 +-
 .../core/analysis/indicator/SumIndicator.java      |   4 -
 .../oap/server/library/client/Client.java          |   8 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 147 +++++++++++--------
 .../elasticsearch/ElasticSearchClientTestCase.java |  62 --------
 .../elasticsearch/ITElasticSearchClient.java       | 160 +++++++++++++++++++--
 .../library-client/src/test/resources/log4j2.xml   |   4 +-
 .../StorageModuleElasticsearchProvider.java        |   3 +-
 .../elasticsearch/base/StorageEsInstaller.java     |  96 +++++--------
 .../elasticsearch/lock/RegisterLockInstaller.java  |  36 ++---
 16 files changed, 302 insertions(+), 257 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
index 24937e5..023c2cf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CPMIndicator.java
@@ -48,9 +48,5 @@ public abstract class CPMIndicator extends Indicator implements LongValueHolder
     @Override public void calculate() {
         this.value = total / getDurationInMinute();
     }
-
-    @Override public long getValue() {
-        return value;
-    }
 }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
index 474f0b5..ea99d42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/CountIndicator.java
@@ -45,8 +45,4 @@ public abstract class CountIndicator extends Indicator implements LongValueHolde
 
     @Override public void calculate() {
     }
-
-    @Override public long getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
index 9867fa1..9b7f9dc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/DoubleAvgIndicator.java
@@ -51,8 +51,4 @@ public abstract class DoubleAvgIndicator extends Indicator implements DoubleValu
     @Override public final void calculate() {
         this.value = this.summation / this.count;
     }
-
-    @Override public double getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
index 1251791..aa15787 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java
@@ -41,14 +41,6 @@ public class IndicatorMetaInfo {
         this.id = id;
     }
 
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
     @Override public String toString() {
         return "IndicatorMetaInfo{" +
             "indicatorName='" + indicatorName + '\'' +
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
index fa1df8b..3d4fce8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/LongAvgIndicator.java
@@ -51,8 +51,4 @@ public abstract class LongAvgIndicator extends Indicator implements LongValueHol
     @Override public final void calculate() {
         this.value = this.summation / this.count;
     }
-
-    @Override public long getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
index ed1251a..fb5f03f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxDoubleIndicator.java
@@ -46,8 +46,4 @@ public abstract class MaxDoubleIndicator extends Indicator implements DoubleValu
 
     @Override public void calculate() {
     }
-
-    @Override public double getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
index db2638a..48dc5b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/MaxLongIndicator.java
@@ -18,11 +18,8 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Entrance;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorFunction;
-import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.SourceFrom;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
@@ -49,8 +46,4 @@ public abstract class MaxLongIndicator extends Indicator implements LongValueHol
 
     @Override public void calculate() {
     }
-
-    @Override public long getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
index 8bf7303..89710df 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java
@@ -45,8 +45,4 @@ public abstract class SumIndicator extends Indicator implements LongValueHolder
 
     @Override public void calculate() {
     }
-
-    @Override public long getValue() {
-        return value;
-    }
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
index 95af9af..69f0f9f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/Client.java
@@ -18,12 +18,14 @@
 
 package org.apache.skywalking.oap.server.library.client;
 
+import java.io.IOException;
+
 /**
  * @author peng-yongsheng
  */
 public interface Client {
-    
-    void connect() throws ClientException;
 
-    void shutdown();
+    void connect() throws IOException;
+
+    void shutdown() throws IOException;
 }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 0e65d72..90714f6 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,53 +18,32 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
+import com.google.gson.*;
+import java.io.*;
+import java.util.*;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.*;
+import org.apache.http.auth.*;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.*;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.create.*;
+import org.elasticsearch.action.admin.indices.delete.*;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetRequest;
-import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.*;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -73,7 +52,7 @@ public class ElasticSearchClient implements Client {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
 
-    private static final String TYPE = "type";
+    public static final String TYPE = "type";
     private final String clusterNodes;
     private final String namespace;
     private final String user;
@@ -87,32 +66,23 @@ public class ElasticSearchClient implements Client {
         this.password = password;
     }
 
-    @Override public void connect() {
+    @Override public void connect() throws IOException {
         List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
         RestClientBuilder builder;
         if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
             final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
             credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
-                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
-                        @Override
-                        public HttpAsyncClientBuilder customizeHttpClient(
-                                HttpAsyncClientBuilder httpClientBuilder) {
-                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                        }
-                    });
+                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
         } else {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
         }
         client = new RestHighLevelClient(builder);
+        client.ping();
     }
 
-    @Override public void shutdown() {
-        try {
-            client.close();
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
+    @Override public void shutdown() throws IOException {
+        client.close();
     }
 
     private List<HttpHost> parseClusterNodes(String nodes) {
@@ -128,17 +98,26 @@ public class ElasticSearchClient implements Client {
         return httpHosts;
     }
 
-    public boolean createIndex(String indexName, Settings settings,
-        XContentBuilder mappingBuilder) throws IOException {
+    public boolean createIndex(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
-        request.settings(settings);
-        request.mapping(TYPE, mappingBuilder);
+        request.settings(settings.toString(), XContentType.JSON);
+        request.mapping(TYPE, mapping.toString(), XContentType.JSON);
         CreateIndexResponse response = client.indices().create(request);
         logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
+    public JsonObject getIndex(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+        GetIndexRequest request = new GetIndexRequest();
+        request.indices(indexName);
+        Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
+        InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+        Gson gson = new Gson();
+        return gson.fromJson(reader, JsonObject.class);
+    }
+
     public boolean deleteIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
@@ -155,6 +134,45 @@ public class ElasticSearchClient implements Client {
         return client.indices().exists(request);
     }
 
+    public boolean isExistsTemplate(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+
+        Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
+
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode == 200) {
+            return true;
+        } else if (statusCode == 404) {
+            return false;
+        } else {
+            throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
+        }
+    }
+
+    public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
+        indexName = formatIndexName(indexName);
+
+        JsonArray patterns = new JsonArray();
+        patterns.add(indexName + "_*");
+
+        JsonObject template = new JsonObject();
+        template.add("index_patterns", patterns);
+        template.add("settings", settings);
+        template.add("mappings", mapping);
+
+        HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
+
+        Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+        return response.getStatusLine().getStatusCode() == 200;
+    }
+
+    public boolean deleteTemplate(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+
+        Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
+        return response.getStatusLine().getStatusCode() == 200;
+    }
+
     public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
         indexName = formatIndexName(indexName);
         SearchRequest searchRequest = new SearchRequest(indexName);
@@ -218,7 +236,7 @@ public class ElasticSearchClient implements Client {
             "  }" +
             "}";
         HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
-        Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
+        Response response = client.getLowLevelClient().performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
         logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
         return response.getStatusLine().getStatusCode();
     }
@@ -230,23 +248,26 @@ public class ElasticSearchClient implements Client {
         return indexName;
     }
 
-    public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
-        int concurrentRequests) {
+    public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
             @Override
             public void beforeBulk(long executionId, BulkRequest request) {
-
+                int numberOfActions = request.numberOfActions();
+                logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
             }
 
             @Override
-            public void afterBulk(long executionId, BulkRequest request,
-                BulkResponse response) {
-
+            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+                if (response.hasFailures()) {
+                    logger.warn("Bulk [{}] executed with failures", executionId);
+                } else {
+                    logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
+                }
             }
 
             @Override
             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+                logger.error("Failed to execute bulk", failure);
             }
         };
 
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
deleted file mode 100644
index de56860..0000000
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
-
-import java.io.IOException;
-import org.apache.skywalking.oap.server.library.client.ClientException;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.*;
-import org.junit.Assert;
-
-/**
- * @author peng-yongsheng
- */
-public class ElasticSearchClientTestCase {
-
-    public static void main(String[] args) throws IOException, ClientException {
-        Settings settings = Settings.builder()
-            .put("number_of_shards", 2)
-            .put("number_of_replicas", 0)
-            .build();
-
-        XContentBuilder builder = XContentFactory.jsonBuilder();
-        builder.startObject()
-            .startObject("_all")
-            .field("enabled", false)
-            .endObject()
-            .startObject("properties")
-            .startObject("column1")
-            .field("type", "text")
-            .endObject()
-            .endObject();
-        builder.endObject();
-
-        ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
-        client.connect();
-
-        String indexName = "test";
-        client.createIndex(indexName, settings, builder);
-        Assert.assertTrue(client.isExistsIndex(indexName));
-        client.deleteIndex(indexName);
-        Assert.assertFalse(client.isExistsIndex(indexName));
-
-
-        client.shutdown();
-    }
-}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index 7bacf58..4e7d483 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -18,29 +18,167 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
+import com.google.gson.JsonObject;
 import java.io.IOException;
-import org.elasticsearch.action.get.GetResponse;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class ITElasticSearchClient {
 
-    @Test
-    public void test() throws IOException {
-        ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
+    private static final Logger logger = LoggerFactory.getLogger(ITElasticSearchClient.class);
+
+    private ElasticSearchClient client;
+
+    @Before
+    public void before() throws IOException {
+        client = new ElasticSearchClient("localhost:9200", "", "test", "test");
         client.connect();
+    }
+
+    @After
+    public void after() throws IOException {
+        client.shutdown();
+    }
+
+    @Test
+    public void indexOperate() throws IOException {
+        JsonObject settings = new JsonObject();
+        settings.addProperty("number_of_shards", 2);
+        settings.addProperty("number_of_replicas", 2);
+
+        JsonObject mapping = new JsonObject();
+        mapping.add("_doc", new JsonObject());
+
+        JsonObject doc = mapping.getAsJsonObject("_doc");
+
+        JsonObject properties = new JsonObject();
+        doc.add("properties", properties);
+
+        JsonObject column = new JsonObject();
+        column.addProperty("type", "text");
+        properties.add("column1", column);
+
+        String indexName = "test_index_operate";
+        client.createIndex(indexName, settings, doc);
+        Assert.assertTrue(client.isExistsIndex(indexName));
+
+        JsonObject index = client.getIndex(indexName);
+        logger.info(index.toString());
+
+        Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
+        Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
+
+        Assert.assertEquals("text", index.getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("type").getAsJsonObject("properties").getAsJsonObject("column1").get("type").getAsString());
+
+        Assert.assertTrue(client.deleteIndex(indexName));
+    }
+
+    @Test
+    public void documentOperate() throws IOException {
+        String id = String.valueOf(System.currentTimeMillis());
+
+        XContentBuilder builder = XContentFactory.jsonBuilder()
+            .startObject()
+            .field("user", "kimchy")
+            .field("post_date", "2009-11-15T14:12:12")
+            .field("message", "trying out Elasticsearch")
+            .endObject();
+
+        String indexName = "test_document_operate";
+        client.forceInsert(indexName, id, builder);
+
+        GetResponse response = client.get(indexName, id);
+        Assert.assertEquals("kimchy", response.getSource().get("user"));
+        Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+
+        builder = XContentFactory.jsonBuilder()
+            .startObject()
+            .field("user", "pengys")
+            .endObject();
+        client.forceUpdate(indexName, id, builder);
+
+        response = client.get(indexName, id);
+        Assert.assertEquals("pengys", response.getSource().get("user"));
+        Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
+
+        List<String> ids = new ArrayList<>();
+        ids.add(id);
+        MultiGetResponse responses = client.multiGet(indexName, ids);
+        Assert.assertEquals(1, responses.getResponses().length);
+        Assert.assertEquals("pengys", responses.getResponses()[0].getResponse().getSource().get("user"));
+
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+        sourceBuilder.query(QueryBuilders.termQuery("user", "pengys"));
+        SearchResponse searchResponse = client.search(indexName, sourceBuilder);
+        Assert.assertEquals("trying out Elasticsearch", searchResponse.getHits().getHits()[0].getSourceAsMap().get("message"));
+    }
+
+    @Test
+    public void templateOperate() throws IOException {
+        JsonObject settings = new JsonObject();
+        settings.addProperty("number_of_shards", 1);
+        settings.addProperty("number_of_replicas", 0);
+        settings.addProperty("index.refresh_interval", "3s");
+        settings.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
+
+        JsonObject mapping = new JsonObject();
+        mapping.add("type", new JsonObject());
+        JsonObject doc = mapping.getAsJsonObject("type");
+
+        JsonObject properties = new JsonObject();
+        doc.add("properties", properties);
+
+        JsonObject column = new JsonObject();
+        column.addProperty("type", "text");
+        properties.add("name", column);
+
+        String indexName = "template_operate";
+
+        client.createTemplate(indexName, settings, mapping);
+
+        Assert.assertTrue(client.isExistsTemplate(indexName));
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
+            .field("name", "pengys")
+            .endObject();
+        client.forceInsert(indexName + "_2019", "testid", builder);
+
+        JsonObject index = client.getIndex(indexName + "_2019");
+        logger.info(index.toString());
+        Assert.assertEquals(1, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
+        Assert.assertEquals(0, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
+
+        client.deleteTemplate(indexName);
+        Assert.assertFalse(client.isExistsTemplate(indexName));
+    }
+
+    @Test
+    public void bulk() throws InterruptedException {
+        BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
 
-        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
-        builder.field("key", "value");
-        builder.endObject();
-        client.forceInsert("test_index", "201904091521", builder);
+        Map<String, String> source = new HashMap<>();
+        source.put("column1", "value1");
+        source.put("column2", "value2");
 
-        GetResponse response = client.get("test_index", "201904091521");
+        for (int i = 0; i < 100; i++) {
+            IndexRequest indexRequest = new IndexRequest("bulk_insert_test", "type", String.valueOf(i));
+            indexRequest.source(source);
+            bulkProcessor.add(indexRequest);
+        }
 
-        Assert.assertTrue(response.getSource().containsKey("key"));
-        Assert.assertEquals("value", response.getSource().get("key"));
+        bulkProcessor.flush();
+        bulkProcessor.awaitClose(2, TimeUnit.SECONDS);
     }
 }
diff --git a/oap-server/server-library/library-client/src/test/resources/log4j2.xml b/oap-server/server-library/library-client/src/test/resources/log4j2.xml
index 6eb5b3f..cd67282 100644
--- a/oap-server/server-library/library-client/src/test/resources/log4j2.xml
+++ b/oap-server/server-library/library-client/src/test/resources/log4j2.xml
@@ -17,14 +17,14 @@
   ~
   -->
 
-<Configuration status="DEBUG">
+<Configuration status="INFO">
     <Appenders>
         <Console name="Console" target="SYSTEM_OUT">
             <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
         </Console>
     </Appenders>
     <Loggers>
-        <Root level="DEBUG">
+        <Root level="INFO">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index bff66df..419ed56 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
+import java.io.IOException;
 import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.storage.*;
@@ -98,7 +99,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
 
             RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
             lockInstaller.install();
-        } catch (StorageException e) {
+        } catch (StorageException | IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 368228e..156bf43 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -18,21 +18,14 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import com.google.gson.JsonObject;
+import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
-import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -43,13 +36,13 @@ public class StorageEsInstaller extends ModelInstaller {
 
     private final int indexShardsNumber;
     private final int indexReplicasNumber;
-    private final ColumnTypeEsMapping mapping;
+    private final ColumnTypeEsMapping columnTypeEsMapping;
 
     public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
         super(moduleManager);
         this.indexShardsNumber = indexShardsNumber;
         this.indexReplicasNumber = indexReplicasNumber;
-        this.mapping = new ColumnTypeEsMapping();
+        this.columnTypeEsMapping = new ColumnTypeEsMapping();
     }
 
     @Override protected boolean isExists(Client client, Model tableDefine) throws StorageException {
@@ -80,20 +73,13 @@ public class StorageEsInstaller extends ModelInstaller {
     @Override protected void createTable(Client client, Model tableDefine) throws StorageException {
         ElasticSearchClient esClient = (ElasticSearchClient)client;
 
-        // mapping
-        XContentBuilder mappingBuilder = null;
-
-        Settings settings = createSettingBuilder();
-        try {
-            mappingBuilder = createMappingBuilder(tableDefine);
-            logger.info("index {}'s mapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), Strings.toString(mappingBuilder.prettyPrint()));
-        } catch (Exception e) {
-            logger.error("create {} index mapping builder error, error message: {}", esClient.formatIndexName(tableDefine.getName()), e.getMessage());
-        }
+        JsonObject settings = createSetting();
+        JsonObject mapping = createMapping(tableDefine);
+        logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), mapping.toString());
 
         boolean isAcknowledged;
         try {
-            isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
+            isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mapping);
         } catch (IOException e) {
             throw new StorageException(e.getMessage());
         }
@@ -104,50 +90,46 @@ public class StorageEsInstaller extends ModelInstaller {
         }
     }
 
-    private Settings createSettingBuilder() {
-        return Settings.builder()
-            .put("index.number_of_shards", indexShardsNumber)
-            .put("index.number_of_replicas", indexReplicasNumber)
-            .put("index.refresh_interval", "3s")
-            .put("analysis.analyzer.oap_analyzer.type", "stop")
-            .build();
+    private JsonObject createSetting() {
+        JsonObject setting = new JsonObject();
+        setting.addProperty("index.number_of_shards", indexShardsNumber);
+        setting.addProperty("index.number_of_replicas", indexReplicasNumber);
+        setting.addProperty("index.refresh_interval", "3s");
+        setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
+        return setting;
     }
 
-    private XContentBuilder createMappingBuilder(Model tableDefine) throws IOException {
-        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
-            .startObject()
-            .startObject("_all")
-            .field("enabled", false)
-            .endObject()
-            .startObject("properties");
+    private JsonObject createMapping(Model tableDefine) {
+        JsonObject mapping = new JsonObject();
+        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
+
+        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+
+        JsonObject properties = new JsonObject();
+        type.add("properties", properties);
 
         for (ModelColumn columnDefine : tableDefine.getColumns()) {
             if (columnDefine.isMatchQuery()) {
                 String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
 
-                mappingBuilder
-                    .startObject(columnDefine.getColumnName().getName())
-                    .field("type", mapping.transform(columnDefine.getType()))
-                    .field("copy_to", matchCName)
-                    .endObject()
-                    .startObject(matchCName)
-                    .field("type", "text")
-                    .field("analyzer", "oap_analyzer")
-                    .endObject();
+                JsonObject originalColumn = new JsonObject();
+                originalColumn.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
+                originalColumn.addProperty("copy_to", matchCName);
+                properties.add(columnDefine.getColumnName().getName(), originalColumn);
+
+                JsonObject matchColumn = new JsonObject();
+                matchColumn.addProperty("type", "text");
+                matchColumn.addProperty("analyzer", "oap_analyzer");
+                properties.add(columnDefine.getColumnName().getName(), matchColumn);
             } else {
-                mappingBuilder
-                    .startObject(columnDefine.getColumnName().getName())
-                    .field("type", mapping.transform(columnDefine.getType()))
-                    .endObject();
+                JsonObject column = new JsonObject();
+                column.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
+                properties.add(columnDefine.getColumnName().getName(), column);
             }
         }
 
-        mappingBuilder
-            .endObject()
-            .endObject();
-
-        logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
+        logger.debug("create elasticsearch index: {}", mapping.toString());
 
-        return mappingBuilder;
+        return mapping;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 72ff4f0..87e233f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
 
+import com.google.gson.JsonObject;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.*;
 import org.slf4j.*;
 
@@ -68,22 +68,24 @@ public class RegisterLockInstaller {
     }
 
     private void createIndex() throws IOException {
-        Settings settings = Settings.builder()
-            .put("index.number_of_shards", 1)
-            .put("index.number_of_replicas", 0)
-            .put("index.refresh_interval", "1s")
-            .build();
-
-        XContentBuilder source = XContentFactory.jsonBuilder()
-            .startObject()
-            .startObject("properties")
-            .startObject(RegisterLockIndex.COLUMN_SEQUENCE)
-            .field("type", "integer")
-            .endObject()
-            .endObject()
-            .endObject();
-
-        client.createIndex(RegisterLockIndex.NAME, settings, source);
+        JsonObject settings = new JsonObject();
+        settings.addProperty("index.number_of_shards", 1);
+        settings.addProperty("index.number_of_replicas", 0);
+        settings.addProperty("index.refresh_interval", "1s");
+
+        JsonObject mapping = new JsonObject();
+        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
+
+        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+
+        JsonObject properties = new JsonObject();
+        type.add("properties", properties);
+
+        JsonObject column = new JsonObject();
+        column.addProperty("type", "integer");
+        properties.add(RegisterLockIndex.COLUMN_SEQUENCE, column);
+
+        client.createIndex(RegisterLockIndex.NAME, settings, mapping);
     }
 
     private void putIfAbsent(int scopeId) throws IOException {