You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ta...@apache.org on 2019/05/14 13:04:46 UTC

[skywalking] branch es_7x created (now b5a4490)

This is an automated email from the ASF dual-hosted git repository.

tanjian pushed a change to branch es_7x
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at b5a4490  update es 7.x

This branch includes the following new commits:

     new b5a4490  update es 7.x

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: update es 7.x

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanjian pushed a commit to branch es_7x
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b5a4490b00d972cb00070d789b3a8f0d1509fa95
Author: Jared.Tan <ji...@daocloud.io>
AuthorDate: Tue May 14 20:58:44 2019 +0800

    update es 7.x
---
 oap-server/pom.xml                                 |   7 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 158 +++++++++++----------
 .../elasticsearch/base/HistoryDeleteEsDAO.java     |   7 +-
 .../elasticsearch/base/StorageEsInstaller.java     |  13 +-
 .../elasticsearch/lock/RegisterLockInstaller.java  |  12 +-
 5 files changed, 102 insertions(+), 95 deletions(-)

diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index db5bbe1..1e684a8 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>apm</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -62,7 +63,7 @@
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
         <commons-io.version>2.6</commons-io.version>
-        <elasticsearch.version>6.3.2</elasticsearch.version>
+        <elasticsearch.version>7.0.1</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
         <hikaricp.version>3.1.0</hikaricp.version>
@@ -359,4 +360,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 90714f6..3b36c95 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,32 +18,58 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
-import com.google.gson.*;
-import java.io.*;
-import java.util.*;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.*;
-import org.apache.http.auth.*;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.methods.*;
-import org.apache.http.entity.ContentType;
+import org.apache.http.client.methods.HttpPut;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.nio.entity.NStringEntity;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.admin.indices.create.*;
-import org.elasticsearch.action.admin.indices.delete.*;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.*;
-import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.get.MultiGetResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.unit.*;
-import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -78,7 +104,7 @@ public class ElasticSearchClient implements Client {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
         }
         client = new RestHighLevelClient(builder);
-        client.ping();
+        client.ping(RequestOptions.DEFAULT);
     }
 
     @Override public void shutdown() throws IOException {
@@ -102,51 +128,38 @@ public class ElasticSearchClient implements Client {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         request.settings(settings.toString(), XContentType.JSON);
-        request.mapping(TYPE, mapping.toString(), XContentType.JSON);
-        CreateIndexResponse response = client.indices().create(request);
+        request.mapping(mapping.toString(), XContentType.JSON);
+        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
         logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public JsonObject getIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(indexName);
-        Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
-        InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
         Gson gson = new Gson();
-        return gson.fromJson(reader, JsonObject.class);
+        return gson.fromJson(response.toString(), JsonObject.class);
     }
 
     public boolean deleteIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
-        DeleteIndexResponse response;
-        response = client.indices().delete(request);
+        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
         logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public boolean isExistsIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(indexName);
-        return client.indices().exists(request);
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        return client.indices().exists(request, RequestOptions.DEFAULT);
     }
 
     public boolean isExistsTemplate(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-
-        Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
-
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (statusCode == 200) {
-            return true;
-        } else if (statusCode == 404) {
-            return false;
-        } else {
-            throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
-        }
+        IndexTemplatesExistRequest request = new IndexTemplatesExistRequest("/_template/" + indexName);
+        return client.indices().existsTemplate(request, RequestOptions.DEFAULT);
     }
 
     public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
@@ -160,85 +173,77 @@ public class ElasticSearchClient implements Client {
         template.add("settings", settings);
         template.add("mappings", mapping);
 
-        HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
-
-        Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+        Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + indexName);
+        Response response = client.getLowLevelClient().performRequest(request);
         return response.getStatusLine().getStatusCode() == 200;
     }
 
     public boolean deleteTemplate(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-
-        Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
-        return response.getStatusLine().getStatusCode() == 200;
+        DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest("/_template/" + indexName);
+        AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT);
+        return deleteTemplateAcknowledge.isAcknowledged();
     }
 
     public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
         indexName = formatIndexName(indexName);
         SearchRequest searchRequest = new SearchRequest(indexName);
-        searchRequest.types(TYPE);
         searchRequest.source(searchSourceBuilder);
-        return client.search(searchRequest);
+        return client.search(searchRequest, RequestOptions.DEFAULT);
     }
 
     public GetResponse get(String indexName, String id) throws IOException {
         indexName = formatIndexName(indexName);
-        GetRequest request = new GetRequest(indexName, TYPE, id);
-        return client.get(request);
+        GetRequest request = new GetRequest(indexName, id);
+        return client.get(request, RequestOptions.DEFAULT);
     }
 
     public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException {
         final String newIndexName = formatIndexName(indexName);
         MultiGetRequest request = new MultiGetRequest();
-        ids.forEach(id -> request.add(newIndexName, TYPE, id));
-        return client.multiGet(request);
+        ids.forEach(id -> request.add(newIndexName, id));
+        return client.mget(request, RequestOptions.DEFAULT);
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
         IndexRequest request = prepareInsert(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.index(request);
+        client.index(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
-        request.version(version);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
         indexName = formatIndexName(indexName);
-        return new IndexRequest(indexName, TYPE, id).source(source);
+        return new IndexRequest(indexName).id(id).source(source);
     }
 
     public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
         indexName = formatIndexName(indexName);
-        return new UpdateRequest(indexName, TYPE, id).doc(source);
+        return new UpdateRequest(indexName, id).doc(source);
     }
 
-    public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
+    public long delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
         indexName = formatIndexName(indexName);
-        Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
-        String jsonString = "{" +
-            "  \"query\": {" +
-            "    \"range\": {" +
-            "      \"" + timeBucketColumnName + "\": {" +
-            "        \"lte\": " + endTimeBucket +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}";
-        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
-        Response response = client.getLowLevelClient().performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
-        logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
-        return response.getStatusLine().getStatusCode();
+
+        DeleteByQueryRequest deleteByQueryRequest =
+            new DeleteByQueryRequest(indexName);
+
+        deleteByQueryRequest.setConflicts("proceed");
+        deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
+
+        BulkByScrollResponse bulkResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getStatus().getSuccessfullyProcessed();
     }
 
     public String formatIndexName(String indexName) {
@@ -271,7 +276,8 @@ public class ElasticSearchClient implements Client {
             }
         };
 
-        return BulkProcessor.builder(client::bulkAsync, listener)
+        return BulkProcessor.builder((request, bulkListener) ->
+            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
             .setBulkActions(bulkActions)
             .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 44c6fdd..3279cb8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -21,7 +21,8 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -37,9 +38,9 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
     @Override
     public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
         ElasticSearchClient client = getClient();
-        int statusCode = client.delete(modelName, timeBucketColumnName, timeBucketBefore);
+        long processed = client.delete(modelName, timeBucketColumnName, timeBucketBefore);
         if (logger.isDebugEnabled()) {
-            logger.debug("Delete history from {} index, status code {}", client.formatIndexName(modelName), statusCode);
+            logger.debug("Delete history from {} index, processed {}", client.formatIndexName(modelName), processed);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 47a7ad4..0b9a679 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -21,11 +21,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import com.google.gson.JsonObject;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -101,12 +104,10 @@ public class StorageEsInstaller extends ModelInstaller {
 
     private JsonObject createMapping(Model tableDefine) {
         JsonObject mapping = new JsonObject();
-        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
-
-        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+        mapping.add("properties", new JsonObject());
 
         JsonObject properties = new JsonObject();
-        type.add("properties", properties);
+        mapping.add("properties", properties);
 
         for (ModelColumn columnDefine : tableDefine.getColumns()) {
             if (columnDefine.isMatchQuery()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 87e233f..98bf9b4 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -25,8 +25,10 @@ import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.common.xcontent.*;
-import org.slf4j.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -74,12 +76,8 @@ public class RegisterLockInstaller {
         settings.addProperty("index.refresh_interval", "1s");
 
         JsonObject mapping = new JsonObject();
-        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
-
-        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
-
         JsonObject properties = new JsonObject();
-        type.add("properties", properties);
+        mapping.add("properties", properties);
 
         JsonObject column = new JsonObject();
         column.addProperty("type", "integer");