You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ta...@apache.org on 2019/05/14 13:04:47 UTC

[skywalking] 01/01: update es 7.x

This is an automated email from the ASF dual-hosted git repository.

tanjian pushed a commit to branch es_7x
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b5a4490b00d972cb00070d789b3a8f0d1509fa95
Author: Jared.Tan <ji...@daocloud.io>
AuthorDate: Tue May 14 20:58:44 2019 +0800

    update es 7.x
---
 oap-server/pom.xml                                 |   7 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 158 +++++++++++----------
 .../elasticsearch/base/HistoryDeleteEsDAO.java     |   7 +-
 .../elasticsearch/base/StorageEsInstaller.java     |  13 +-
 .../elasticsearch/lock/RegisterLockInstaller.java  |  12 +-
 5 files changed, 102 insertions(+), 95 deletions(-)

diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index db5bbe1..1e684a8 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>apm</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -62,7 +63,7 @@
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
         <commons-io.version>2.6</commons-io.version>
-        <elasticsearch.version>6.3.2</elasticsearch.version>
+        <elasticsearch.version>7.0.1</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
         <hikaricp.version>3.1.0</hikaricp.version>
@@ -359,4 +360,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 90714f6..3b36c95 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,32 +18,58 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
-import com.google.gson.*;
-import java.io.*;
-import java.util.*;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.*;
-import org.apache.http.auth.*;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.methods.*;
-import org.apache.http.entity.ContentType;
+import org.apache.http.client.methods.HttpPut;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.nio.entity.NStringEntity;
 import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.admin.indices.create.*;
-import org.elasticsearch.action.admin.indices.delete.*;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.*;
-import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.get.MultiGetResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.*;
-import org.elasticsearch.common.unit.*;
-import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -78,7 +104,7 @@ public class ElasticSearchClient implements Client {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
         }
         client = new RestHighLevelClient(builder);
-        client.ping();
+        client.ping(RequestOptions.DEFAULT);
     }
 
     @Override public void shutdown() throws IOException {
@@ -102,51 +128,38 @@ public class ElasticSearchClient implements Client {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         request.settings(settings.toString(), XContentType.JSON);
-        request.mapping(TYPE, mapping.toString(), XContentType.JSON);
-        CreateIndexResponse response = client.indices().create(request);
+        request.mapping(mapping.toString(), XContentType.JSON);
+        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
         logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public JsonObject getIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(indexName);
-        Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
-        InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
         Gson gson = new Gson();
-        return gson.fromJson(reader, JsonObject.class);
+        return gson.fromJson(response.toString(), JsonObject.class);
     }
 
     public boolean deleteIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
-        DeleteIndexResponse response;
-        response = client.indices().delete(request);
+        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
         logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public boolean isExistsIndex(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-        GetIndexRequest request = new GetIndexRequest();
-        request.indices(indexName);
-        return client.indices().exists(request);
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        return client.indices().exists(request, RequestOptions.DEFAULT);
     }
 
     public boolean isExistsTemplate(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-
-        Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
-
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (statusCode == 200) {
-            return true;
-        } else if (statusCode == 404) {
-            return false;
-        } else {
-            throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
-        }
+        IndexTemplatesExistRequest request = new IndexTemplatesExistRequest("/_template/" + indexName);
+        return client.indices().existsTemplate(request, RequestOptions.DEFAULT);
     }
 
     public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
@@ -160,85 +173,77 @@ public class ElasticSearchClient implements Client {
         template.add("settings", settings);
         template.add("mappings", mapping);
 
-        HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
-
-        Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+        Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + indexName);
+        Response response = client.getLowLevelClient().performRequest(request);
         return response.getStatusLine().getStatusCode() == 200;
     }
 
     public boolean deleteTemplate(String indexName) throws IOException {
         indexName = formatIndexName(indexName);
-
-        Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
-        return response.getStatusLine().getStatusCode() == 200;
+        DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest("/_template/" + indexName);
+        AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT);
+        return deleteTemplateAcknowledge.isAcknowledged();
     }
 
     public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
         indexName = formatIndexName(indexName);
         SearchRequest searchRequest = new SearchRequest(indexName);
-        searchRequest.types(TYPE);
         searchRequest.source(searchSourceBuilder);
-        return client.search(searchRequest);
+        return client.search(searchRequest, RequestOptions.DEFAULT);
     }
 
     public GetResponse get(String indexName, String id) throws IOException {
         indexName = formatIndexName(indexName);
-        GetRequest request = new GetRequest(indexName, TYPE, id);
-        return client.get(request);
+        GetRequest request = new GetRequest(indexName, id);
+        return client.get(request, RequestOptions.DEFAULT);
     }
 
     public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException {
         final String newIndexName = formatIndexName(indexName);
         MultiGetRequest request = new MultiGetRequest();
-        ids.forEach(id -> request.add(newIndexName, TYPE, id));
-        return client.multiGet(request);
+        ids.forEach(id -> request.add(newIndexName, id));
+        return client.mget(request, RequestOptions.DEFAULT);
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
         IndexRequest request = prepareInsert(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.index(request);
+        client.index(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
-        request.version(version);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
         UpdateRequest request = prepareUpdate(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request);
+        client.update(request, RequestOptions.DEFAULT);
     }
 
     public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
         indexName = formatIndexName(indexName);
-        return new IndexRequest(indexName, TYPE, id).source(source);
+        return new IndexRequest(indexName).id(id).source(source);
     }
 
     public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
         indexName = formatIndexName(indexName);
-        return new UpdateRequest(indexName, TYPE, id).doc(source);
+        return new UpdateRequest(indexName, id).doc(source);
     }
 
-    public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
+    public long delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
         indexName = formatIndexName(indexName);
-        Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
-        String jsonString = "{" +
-            "  \"query\": {" +
-            "    \"range\": {" +
-            "      \"" + timeBucketColumnName + "\": {" +
-            "        \"lte\": " + endTimeBucket +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}";
-        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
-        Response response = client.getLowLevelClient().performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
-        logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
-        return response.getStatusLine().getStatusCode();
+
+        DeleteByQueryRequest deleteByQueryRequest =
+            new DeleteByQueryRequest(indexName);
+
+        deleteByQueryRequest.setConflicts("proceed");
+        deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
+
+        BulkByScrollResponse bulkResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getStatus().getSuccessfullyProcessed();
     }
 
     public String formatIndexName(String indexName) {
@@ -271,7 +276,8 @@ public class ElasticSearchClient implements Client {
             }
         };
 
-        return BulkProcessor.builder(client::bulkAsync, listener)
+        return BulkProcessor.builder((request, bulkListener) ->
+            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
             .setBulkActions(bulkActions)
             .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 44c6fdd..3279cb8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -21,7 +21,8 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -37,9 +38,9 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
     @Override
     public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
         ElasticSearchClient client = getClient();
-        int statusCode = client.delete(modelName, timeBucketColumnName, timeBucketBefore);
+        long processed = client.delete(modelName, timeBucketColumnName, timeBucketBefore);
         if (logger.isDebugEnabled()) {
-            logger.debug("Delete history from {} index, status code {}", client.formatIndexName(modelName), statusCode);
+            logger.debug("Delete history from {} index, processed {}", client.formatIndexName(modelName), processed);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 47a7ad4..0b9a679 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -21,11 +21,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import com.google.gson.JsonObject;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -101,12 +104,10 @@ public class StorageEsInstaller extends ModelInstaller {
 
     private JsonObject createMapping(Model tableDefine) {
         JsonObject mapping = new JsonObject();
-        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
-
-        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
+        mapping.add("properties", new JsonObject());
 
         JsonObject properties = new JsonObject();
-        type.add("properties", properties);
+        mapping.add("properties", properties);
 
         for (ModelColumn columnDefine : tableDefine.getColumns()) {
             if (columnDefine.isMatchQuery()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 87e233f..98bf9b4 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -25,8 +25,10 @@ import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.common.xcontent.*;
-import org.slf4j.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -74,12 +76,8 @@ public class RegisterLockInstaller {
         settings.addProperty("index.refresh_interval", "1s");
 
         JsonObject mapping = new JsonObject();
-        mapping.add(ElasticSearchClient.TYPE, new JsonObject());
-
-        JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
-
         JsonObject properties = new JsonObject();
-        type.add("properties", properties);
+        mapping.add("properties", properties);
 
         JsonObject column = new JsonObject();
         column.addProperty("type", "integer");