You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/02/15 12:45:52 UTC
[skywalking] 01/01: Support day/hour/minute metrics merging into
one index. Reduce the number of index 50%.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch es-idx-merging
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 6dee4fab6668a1e18a43debc543bafc2d0a2d87b
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Feb 15 20:45:39 2020 +0800
Support day/hour/minute metrics merging into one index. Reduce the number of index 50%.
---
dist-material/application.yml | 2 +
.../src/main/resources/application.yml | 2 +
.../client/elasticsearch/ElasticSearchClient.java | 89 ++++++-----
.../client/elasticsearch/IndexNameConverter.java | 26 ++++
.../elasticsearch/ITElasticSearchClient.java | 33 +++-
.../server/receiver/trace/mock/AgentDataMock.java | 105 +++++++++++++
.../server/receiver/trace/mock/RegisterMock.java | 113 ++++++++++++++
.../server/receiver/trace/mock/ServiceAMock.java | 118 +++++++++++++++
.../server/receiver/trace/mock/ServiceBMock.java | 166 +++++++++++++++++++++
.../server/receiver/trace/mock/ServiceCMock.java | 114 ++++++++++++++
.../receiver/trace/mock/UniqueIdBuilder.java | 39 +++++
.../StorageModuleElasticsearchConfig.java | 10 ++
.../StorageModuleElasticsearchProvider.java | 122 ++++++++++++---
.../StorageModuleElasticsearch7Provider.java | 71 +++++++--
.../client/ElasticSearch7Client.java | 65 +++++---
15 files changed, 977 insertions(+), 98 deletions(-)
diff --git a/dist-material/application.yml b/dist-material/application.yml
index 25e165a..e6bad28 100644
--- a/dist-material/application.yml
+++ b/dist-material/application.yml
@@ -82,6 +82,7 @@ storage:
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
@@ -104,6 +105,7 @@ storage:
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index faaa7e3..9f06f78 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -83,6 +83,7 @@ storage:
# #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
+# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
@@ -104,6 +105,7 @@ storage:
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+ enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 01abbeb..0f47abe 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
@@ -83,30 +84,34 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * ElasticSearchClient connects to the ES server by using ES client APIs.
+ */
+@Slf4j
public class ElasticSearchClient implements Client {
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
-
public static final String TYPE = "type";
protected final String clusterNodes;
protected final String protocol;
private final String trustStorePath;
private final String trustStorePass;
- private final String namespace;
private final String user;
private final String password;
+ private final List<IndexNameConverter> indexNameConverters;
protected RestHighLevelClient client;
- public ElasticSearchClient(String clusterNodes, String protocol, String trustStorePath, String trustStorePass,
- String namespace, String user, String password) {
+ public ElasticSearchClient(String clusterNodes,
+ String protocol,
+ String trustStorePath,
+ String trustStorePass,
+ String user,
+ String password,
+ List<IndexNameConverter> indexNameConverters) {
this.clusterNodes = clusterNodes;
this.protocol = protocol;
- this.namespace = namespace;
this.user = user;
this.password = password;
+ this.indexNameConverters = indexNameConverters;
this.trustStorePath = trustStorePath;
this.trustStorePass = trustStorePass;
}
@@ -127,7 +132,9 @@ public class ElasticSearchClient implements Client {
if (StringUtil.isEmpty(trustStorePath)) {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
- .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ .setHttpClientConfigCallback(
+ httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
} else {
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
@@ -136,8 +143,10 @@ public class ElasticSearchClient implements Client {
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
- .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
- .setSSLContext(sslContext));
+ .setHttpClientConfigCallback(
+ httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider)
+ .setSSLContext(sslContext));
}
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
@@ -153,7 +162,7 @@ public class ElasticSearchClient implements Client {
public static List<HttpHost> parseClusterNodes(String protocol, String nodes) {
List<HttpHost> httpHosts = new LinkedList<>();
- logger.info("elasticsearch cluster nodes: {}", nodes);
+ log.info("elasticsearch cluster nodes: {}", nodes);
List<String> nodesSplit = Splitter.on(",").omitEmptyStrings().splitToList(nodes);
for (String node : nodesSplit) {
@@ -170,19 +179,19 @@ public class ElasticSearchClient implements Client {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse response = client.indices().create(request);
- logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean createIndex(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
+ Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
Gson gson = new Gson();
request.settings(gson.toJson(settings), XContentType.JSON);
request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
CreateIndexResponse response = client.indices().create(request);
- logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
@@ -193,7 +202,7 @@ public class ElasticSearchClient implements Client {
Gson gson = new Gson();
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
- logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
+ log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
return new ArrayList<>(responseJson.keySet());
}
return Collections.emptyList();
@@ -227,7 +236,7 @@ public class ElasticSearchClient implements Client {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexResponse response;
response = client.indices().delete(request);
- logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
@@ -249,12 +258,13 @@ public class ElasticSearchClient implements Client {
} else if (statusCode == HttpStatus.SC_NOT_FOUND) {
return false;
} else {
- throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
+ throw new IOException(
+ "The response status code of template exists request should be 200 or 404, but it is " + statusCode);
}
}
public boolean createTemplate(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
+ Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
String[] patterns = new String[] {indexName + "-*"};
@@ -271,7 +281,8 @@ public class ElasticSearchClient implements Client {
HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient()
- .performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+ .performRequest(
+ HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
@@ -313,14 +324,16 @@ public class ElasticSearchClient implements Client {
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+ org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+ indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+ org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+ indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
@@ -341,8 +354,9 @@ public class ElasticSearchClient implements Client {
String jsonString = "{" + " \"query\": {" + " \"range\": {" + " \"" + timeBucketColumnName + "\": {" + " \"lte\": " + endTimeBucket + " }" + " }" + " }" + "}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient()
- .performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
- logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
+ .performRequest(
+ HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
+ log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
}
@@ -353,9 +367,9 @@ public class ElasticSearchClient implements Client {
try {
int size = request.requests().size();
BulkResponse responses = client.bulk(request);
- logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
+ log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
@@ -375,31 +389,34 @@ public class ElasticSearchClient implements Client {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
- logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
+ log.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
- logger.warn("Bulk [{}] executed with failures", executionId);
+ log.warn("Bulk [{}] executed with failures", executionId);
} else {
- logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
- .getMillis(), request
- .requests()
- .size());
+ log.info(
+ "Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
+ .getMillis(),
+ request
+ .requests()
+ .size()
+ );
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- logger.error("Failed to execute bulk", failure);
+ log.error("Failed to execute bulk", failure);
}
};
}
public String formatIndexName(String indexName) {
- if (StringUtil.isNotEmpty(namespace)) {
- return namespace + "_" + indexName;
+ for (final IndexNameConverter indexNameConverter : indexNameConverters) {
+ indexName = indexNameConverter.convert(indexName);
}
return indexName;
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java
new file mode 100644
index 0000000..faeca02
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+/**
+ * Implementation supports the ElasticSearch index name converting.
+ */
+public interface IndexNameConverter {
+ String convert(String name);
+}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index 5f1abcc..c18bdff 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,7 +68,9 @@ public class ITElasticSearchClient {
public void before() throws Exception {
final String esAddress = System.getProperty("elastic.search.address");
final String esProtocol = System.getProperty("elastic.search.protocol");
- client = new ElasticSearchClient(esAddress, esProtocol, "", "", namespace, "test", "test");
+ client = new ElasticSearchClient(esAddress, esProtocol, "", "", "test", "test",
+ indexNameConverters(namespace)
+ );
client.connect();
}
@@ -270,12 +273,36 @@ public class ITElasticSearchClient {
index.add(oldIndexName.substring(namespacePrefix.length()), entry.getValue());
index.remove(oldIndexName);
} else {
- throw new RuntimeException("The indexName must contain the " + namespace + " prefix, but it is " + entry
- .getKey());
+ throw new RuntimeException(
+ "The indexName must contain the " + namespace + " prefix, but it is " + entry
+ .getKey());
}
});
logger.info("UndoFormatIndexName after " + index.toString());
}
return index;
}
+
+ private static List<IndexNameConverter> indexNameConverters(String namespace) {
+ List<IndexNameConverter> converters = new ArrayList<>();
+ converters.add(new NamespaceConverter(namespace));
+ return converters;
+ }
+
+ private static class NamespaceConverter implements IndexNameConverter {
+ private final String namespace;
+
+ public NamespaceConverter(final String namespace) {
+ this.namespace = namespace;
+ }
+
+ @Override
+ public String convert(final String indexName) {
+ if (StringUtil.isNotEmpty(namespace)) {
+ return namespace + "_" + indexName;
+ }
+
+ return indexName;
+ }
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
new file mode 100644
index 0000000..b53ca2c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
+
+public class AgentDataMock {
+
+ private static boolean IS_COMPLETED = false;
+
+ public static void main(String[] args) throws InterruptedException {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
+
+ RegisterMock registerMock = new RegisterMock(channel);
+
+ StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();
+
+ UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
+ long startTimestamp = System.currentTimeMillis();
+ //long startTimestamp = new DateTime().minusDays(2).getMillis();
+
+ // ServiceAMock
+ ServiceAMock serviceAMock = new ServiceAMock(registerMock);
+ serviceAMock.register();
+
+ // ServiceBMock
+ ServiceBMock serviceBMock = new ServiceBMock(registerMock);
+ serviceBMock.register();
+
+ // ServiceCMock
+ ServiceCMock serviceCMock = new ServiceCMock(registerMock);
+ serviceCMock.register();
+
+ UniqueId.Builder serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
+
+ UniqueId.Builder serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
+
+ UniqueId.Builder serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
+
+ TimeUnit.SECONDS.sleep(10);
+
+ for (int i = 0; i < 500; i++) {
+ globalTraceId = UniqueIdBuilder.INSTANCE.create();
+ serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
+ serviceBMock.mock(
+ streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
+ serviceCMock.mock(
+ streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
+ }
+
+ streamObserver.onCompleted();
+ while (!IS_COMPLETED) {
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ }
+
+ private static StreamObserver<UpstreamSegment> createStreamObserver() {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
+ TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub stub = TraceSegmentReportServiceGrpc.newStub(
+ channel);
+ return stub.collect(new StreamObserver<Commands>() {
+ @Override
+ public void onNext(Commands downstream) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ IS_COMPLETED = true;
+ }
+ });
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
new file mode 100644
index 0000000..ed9c94d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.common.KeyIntValuePair;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
+import org.apache.skywalking.apm.network.common.ServiceType;
+import org.apache.skywalking.apm.network.register.v2.RegisterGrpc;
+import org.apache.skywalking.apm.network.register.v2.Service;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstance;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstanceRegisterMapping;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstances;
+import org.apache.skywalking.apm.network.register.v2.ServiceRegisterMapping;
+import org.apache.skywalking.apm.network.register.v2.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author peng-yongsheng
+ */
+class RegisterMock {
+
+ private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
+
+ private final RegisterGrpc.RegisterBlockingStub registerStub;
+
+ RegisterMock(ManagedChannel channel) {
+ registerStub = RegisterGrpc.newBlockingStub(channel);
+ }
+
+ int registerService(String serviceName) throws InterruptedException {
+ Services.Builder services = Services.newBuilder();
+ services.addServices(Service
+ .newBuilder()
+ .setServiceName(serviceName)
+ .setType(ServiceType.normal))
+ .build();
+
+ ServiceRegisterMapping serviceRegisterMapping;
+ int serviceId = 0;
+ do {
+ serviceRegisterMapping = registerStub.doServiceRegister(services.build());
+
+ List<KeyIntValuePair> servicesList = serviceRegisterMapping.getServicesList();
+ if (servicesList.size() > 0) {
+ serviceId = servicesList.get(0).getValue();
+ logger.debug("service id: {}", serviceId);
+ }
+
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+ while (serviceId == 0);
+
+ return serviceId;
+ }
+
+ int registerServiceInstance(int serviceId, String agentName) throws InterruptedException {
+ ServiceInstances.Builder instances = ServiceInstances.newBuilder();
+
+ instances.addInstances(ServiceInstance.newBuilder()
+ .setServiceId(serviceId)
+ .setInstanceUUID(agentName)
+ .setTime(System.currentTimeMillis())
+ .addAllProperties(buildOSInfo())
+ );
+
+ ServiceInstanceRegisterMapping instanceMapping;
+ int instanceId = 0;
+ do {
+ instanceMapping = registerStub.doServiceInstanceRegister(instances.build());
+ List<KeyIntValuePair> serviceInstancesList = instanceMapping.getServiceInstancesList();
+ if (serviceInstancesList.size() > 0) {
+ instanceId = serviceInstancesList.get(0).getValue();
+ logger.debug("instance id: {}", instanceId);
+ }
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+ while (instanceId == 0);
+
+ return instanceId;
+ }
+
+ public static List<KeyStringValuePair> buildOSInfo() {
+ List<KeyStringValuePair> osInfo = new ArrayList<KeyStringValuePair>();
+
+ osInfo.add(KeyStringValuePair.newBuilder().setKey("os_name").setValue("osName").build());
+ osInfo.add(KeyStringValuePair.newBuilder().setKey("host_name").setValue("hostName").build());
+ osInfo.add(KeyStringValuePair.newBuilder().setKey("ipv4").setValue("ipv4").build());
+ osInfo.add(KeyStringValuePair.newBuilder().setKey("process_no").setValue("123").build());
+ osInfo.add(KeyStringValuePair.newBuilder().setKey("language").setValue("java").build());
+ return osInfo;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
new file mode 100644
index 0000000..65ac83e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceAMock {
+
+ static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest";
+ static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()";
+ static String DUBBO_ADDRESS = "DubboIPAddress:1000";
+ private final RegisterMock registerMock;
+ private static int SERVICE_ID;
+ static int SERVICE_INSTANCE_ID;
+
+ ServiceAMock(RegisterMock registerMock) {
+ this.registerMock = registerMock;
+ }
+
+ void register() throws InterruptedException {
+ SERVICE_ID = registerMock.registerService("dubbox-consumer");
+ SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysA");
+ }
+
+ void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder traceId,
+ UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
+ UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+ upstreamSegment.addGlobalTraceIds(traceId);
+ upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
+
+ streamObserver.onNext(upstreamSegment.build());
+ }
+
+ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
+ SegmentObject.Builder segment = SegmentObject.newBuilder();
+ segment.setTraceSegmentId(segmentId);
+ segment.setServiceId(SERVICE_ID);
+ segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+ segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+ segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+ segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+
+ return segment.build().toByteString();
+ }
+
+ private SpanObjectV2.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(0);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.Http);
+ span.setParentSpanId(-1);
+ span.setStartTime(startTimestamp);
+ span.setEndTime(startTimestamp + 6000);
+ span.setComponentId(ComponentsDefine.TOMCAT.getId());
+ if (isPrepare) {
+ span.setOperationName(REST_ENDPOINT);
+ } else {
+ span.setOperationNameId(2);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObjectV2.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(1);
+ span.setSpanType(SpanType.Local);
+ span.setParentSpanId(0);
+ span.setStartTime(startTimestamp + 100);
+ span.setEndTime(startTimestamp + 500);
+ span.setOperationName("org.apache.skywalking.Local.do");
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(2);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.RPCFramework);
+ span.setParentSpanId(1);
+ span.setStartTime(startTimestamp + 120);
+ span.setEndTime(startTimestamp + 5800);
+ span.setComponentId(ComponentsDefine.DUBBO.getId());
+ span.setOperationName(DUBBO_ENDPOINT);
+ if (isPrepare) {
+ span.setPeer(DUBBO_ADDRESS);
+ } else {
+ span.setPeerId(2);
+ }
+ span.setIsError(false);
+ return span;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
new file mode 100644
index 0000000..46aab9c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
+import org.apache.skywalking.apm.network.language.agent.RefType;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceBMock {
+
+ private final RegisterMock registerMock;
+ private static int SERVICE_ID;
+ static int SERVICE_INSTANCE_ID;
+ static String DUBBO_PROVIDER_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetServiceImpl.doBusiness()";
+ static String ROCKET_MQ_ENDPOINT = "org.apache.skywalking.RocketMQ";
+ static String ROCKET_MQ_ADDRESS = "RocketMQAddress:2000";
+
+ ServiceBMock(RegisterMock registerMock) {
+ this.registerMock = registerMock;
+ }
+
+ void register() throws InterruptedException {
+ SERVICE_ID = registerMock.registerService("dubbox-provider");
+ SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysB");
+ }
+
+ void mock(StreamObserver<UpstreamSegment> streamObserver,
+ UniqueId.Builder traceId,
+ UniqueId.Builder segmentId,
+ UniqueId.Builder parentTraceSegmentId,
+ long startTimestamp,
+ boolean isPrepare) {
+ UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+ upstreamSegment.addGlobalTraceIds(traceId);
+ upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+ streamObserver.onNext(upstreamSegment.build());
+ }
+
+ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+ UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ SegmentObject.Builder segment = SegmentObject.newBuilder();
+ segment.setTraceSegmentId(segmentId);
+ segment.setServiceId(SERVICE_ID);
+ segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+ segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+ segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+ segment.addSpans(createMQExitSpan(startTimestamp, isPrepare));
+
+ return segment.build().toByteString();
+ }
+
+ private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ SegmentReference.Builder reference = SegmentReference.newBuilder();
+ reference.setParentTraceSegmentId(parentTraceSegmentId);
+ reference.setParentServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+ reference.setParentSpanId(2);
+ reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+ reference.setRefType(RefType.CrossProcess);
+
+ if (isPrepare) {
+ reference.setParentEndpoint(ServiceAMock.REST_ENDPOINT);
+ reference.setNetworkAddress(ServiceAMock.DUBBO_ADDRESS);
+ reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
+ } else {
+ reference.setParentEndpointId(2);
+ reference.setNetworkAddressId(2);
+ reference.setEntryEndpointId(2);
+ }
+ return reference;
+ }
+
+ private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(0);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.RPCFramework);
+ span.setParentSpanId(-1);
+ span.setStartTime(startTimestamp + 500);
+ span.setEndTime(startTimestamp + 5000);
+ span.setComponentId(ComponentsDefine.DUBBO.getId());
+ span.setIsError(false);
+ span.addRefs(createReference(uniqueId, isPrepare));
+
+ if (isPrepare) {
+ span.setOperationName(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
+ } else {
+ span.setOperationNameId(4);
+ }
+ return span;
+ }
+
+ private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(1);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.Database);
+ span.setParentSpanId(0);
+ span.setStartTime(startTimestamp + 550);
+ span.setEndTime(startTimestamp + 1500);
+ span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
+ span.setIsError(true);
+ span.addTags(KeyStringValuePair.newBuilder()
+ .setKey("db.statement")
+ .setValue("select * from database where complex = 1;")
+ .build());
+ span.addTags(KeyStringValuePair.newBuilder().setKey("db.type").setValue("mongodb").build());
+
+ span.setOperationName(
+ "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
+ if (isPrepare) {
+ span.setPeer("localhost:27017");
+ } else {
+ span.setPeerId(3);
+ }
+ return span;
+ }
+
+ private SpanObjectV2.Builder createMQExitSpan(long startTimestamp, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(2);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.MQ);
+ span.setParentSpanId(1);
+ span.setStartTime(startTimestamp + 1100);
+ span.setEndTime(startTimestamp + 1500);
+ span.setComponentId(ComponentsDefine.ROCKET_MQ_PRODUCER.getId());
+ span.setIsError(false);
+
+ span.setOperationName(ROCKET_MQ_ENDPOINT);
+ if (isPrepare) {
+ span.setPeer(ROCKET_MQ_ADDRESS);
+ } else {
+ span.setPeerId(4);
+ }
+ return span;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java
new file mode 100644
index 0000000..17897c8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.RefType;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceCMock {
+
+ private final RegisterMock registerMock;
+ private static int SERVICE_ID;
+ private static int SERVICE_INSTANCE_ID;
+
+ ServiceCMock(RegisterMock registerMock) {
+ this.registerMock = registerMock;
+ }
+
+ void register() throws InterruptedException {
+ SERVICE_ID = registerMock.registerService("rocket-mq-consumer");
+ SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysC");
+ }
+
+ void mock(StreamObserver<UpstreamSegment> streamObserver,
+ UniqueId.Builder traceId,
+ UniqueId.Builder segmentId,
+ UniqueId.Builder parentTraceSegmentId,
+ long startTimestamp,
+ boolean isPrepare) {
+ UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+ upstreamSegment.addGlobalTraceIds(traceId);
+ upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+ streamObserver.onNext(upstreamSegment.build());
+ }
+
+ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+ UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ SegmentObject.Builder segment = SegmentObject.newBuilder();
+ segment.setTraceSegmentId(segmentId);
+ segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+ segment.setServiceId(SERVICE_ID);
+ segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+
+ return segment.build().toByteString();
+ }
+
+ private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+ SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+ span.setSpanId(0);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.MQ);
+ span.setParentSpanId(-1);
+ span.setStartTime(startTimestamp + 3000);
+ span.setEndTime(startTimestamp + 5000);
+ span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+ span.setIsError(false);
+ span.addRefs(createReference(uniqueId, isPrepare));
+
+ if (isPrepare) {
+ span.setOperationName(ServiceBMock.ROCKET_MQ_ENDPOINT);
+ } else {
+ span.setOperationNameId(5);
+ }
+ return span;
+ }
+
+ private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ SegmentReference.Builder reference = SegmentReference.newBuilder();
+ reference.setParentTraceSegmentId(parentTraceSegmentId);
+ reference.setParentServiceInstanceId(ServiceBMock.SERVICE_INSTANCE_ID);
+ reference.setParentSpanId(2);
+ reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+ reference.setRefType(RefType.CrossProcess);
+
+ if (isPrepare) {
+ reference.setParentEndpoint(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
+ reference.setNetworkAddress(ServiceBMock.ROCKET_MQ_ADDRESS);
+ reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
+ } else {
+ reference.setParentEndpointId(8);
+ reference.setNetworkAddressId(4);
+ reference.setEntryEndpointId(2);
+ }
+ return reference;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
new file mode 100644
index 0000000..e64adff
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum UniqueIdBuilder {
+ INSTANCE;
+
+ private AtomicLong idPart = new AtomicLong(1);
+
+ UniqueId.Builder create() {
+ UniqueId.Builder uniqueId = UniqueId.newBuilder();
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ return uniqueId;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index dd7de6a..2035bec 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -55,6 +55,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Getter
@Setter
String trustStorePass;
+ /**
+ * If this is ON, downsampling indexes(hour and day precisions) merged into minute precision. In this case, only
+ * {@link #minuteMetricsDataTTL} works for minute, hour and day.
+ *
+ * @since 7.0.0 This is an enhancement. Reduce 50% of index number(remove day/hour index requirements) but keep the
+ * performance nearly same as before. Only one side-effect for 6.x storage is just day/hour indexes remain, users
+ * need to remove them manually.
+ */
+ @Getter
+ private boolean enablePackedDownsampling = true;
@Setter
private int resultWindowMaxSize = 10000;
@Setter
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index eaf115e..8983431 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -23,8 +23,14 @@ import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
@@ -48,6 +54,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
@@ -106,38 +113,54 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
- elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
- .getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
+ elasticSearchClient = new ElasticSearchClient(
+ config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
+ .getTrustStorePass(), config.getUser(), config.getPassword(),
+ indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
+ );
- this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
- .getFlushInterval(), config.getConcurrentRequests()));
+ this.registerServiceImplementation(
+ IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
+ .getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
+ this.registerServiceImplementation(
+ IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient,
+ new ElasticsearchStorageTTL()
+ ));
- this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
- .getResultWindowMaxSize()));
- this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
- .getResultWindowMaxSize()));
- this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
- this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
- .getResultWindowMaxSize()));
+ this.registerServiceImplementation(
+ IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
+ .getResultWindowMaxSize()));
+ this.registerServiceImplementation(
+ IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
+ .getResultWindowMaxSize()));
+ this.registerServiceImplementation(
+ IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(
+ INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
+ .getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+ this.registerServiceImplementation(
+ ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
+ this.registerServiceImplementation(
+ IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
- this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
- this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
- .getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(
+ IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(
+ IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(
+ IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
+ .getProfileTaskQueryMaxSize()));
}
@Override
@@ -174,4 +197,63 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
configService.getDataTTLConfig().setDayMetricsDataTTL(config.getDayMetricsDataTTL());
configService.getDataTTLConfig().setMonthMetricsDataTTL(config.getMonthMetricsDataTTL());
}
+
+ public static List<IndexNameConverter> indexNameConverters(String namespace, boolean enablePackedDownsampling) {
+ List<IndexNameConverter> converters = new ArrayList<>();
+
+ if (enablePackedDownsampling) {
+ // Packed downsampling converter.
+ converters.add(new PackedDownsamplingConverter(enablePackedDownsampling));
+ converters.add(new NamespaceConverter(namespace));
+ }
+ return converters;
+ }
+
+ private static class PackedDownsamplingConverter implements IndexNameConverter {
+ private final boolean enablePackedDownsampling;
+ private final String[] removableSuffixes = new String[] {
+ Const.ID_SPLIT + Downsampling.Day.getName(),
+ Const.ID_SPLIT + Downsampling.Hour.getName()
+ };
+ private final Map<String, String> convertedIndexNames = new ConcurrentHashMap<>();
+
+ public PackedDownsamplingConverter(final boolean enablePackedDownsampling) {
+ this.enablePackedDownsampling = enablePackedDownsampling;
+ }
+
+ @Override
+ public String convert(final String indexName) {
+ String convertedName = convertedIndexNames.get(indexName);
+ if (convertedName != null) {
+ return convertedName;
+ }
+ convertedName = indexName;
+ for (final String removableSuffix : removableSuffixes) {
+ String mayReplaced = indexName.replaceAll(removableSuffix, "");
+ if (mayReplaced.length() != convertedName.length()) {
+ convertedName = mayReplaced;
+ break;
+ }
+ }
+ convertedIndexNames.put(indexName, convertedName);
+ return convertedName;
+ }
+ }
+
+ private static class NamespaceConverter implements IndexNameConverter {
+ private final String namespace;
+
+ public NamespaceConverter(final String namespace) {
+ this.namespace = namespace;
+ }
+
+ @Override
+ public String convert(final String indexName) {
+ if (StringUtil.isNotEmpty(namespace)) {
+ return namespace + "_" + indexName;
+ }
+
+ return indexName;
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 377ab26..99a96c4 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -76,6 +76,8 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.Metr
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.ProfileThreadSnapshotQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.TraceQueryEs7DAO;
+import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider.indexNameConverters;
+
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
protected final StorageModuleElasticsearch7Config config;
@@ -106,31 +108,68 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
- elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
- .getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
-
- this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
+ elasticSearch7Client = new ElasticSearch7Client(
+ config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
+ .getTrustStorePass(), config.getUser(), config.getPassword(),
+ indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
+ );
+
+ this.registerServiceImplementation(
+ IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
+ config.getFlushInterval(), config.getConcurrentRequests()
+ ));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));
-
- this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
- this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
- this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
- this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
+ this.registerServiceImplementation(
+ IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client,
+ new ElasticsearchStorageTTL()
+ ));
+
+ this.registerServiceImplementation(
+ IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(
+ elasticSearch7Client,
+ config.getResultWindowMaxSize()
+ ));
+ this.registerServiceImplementation(
+ IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(
+ elasticSearch7Client,
+ config.getResultWindowMaxSize()
+ ));
+ this.registerServiceImplementation(
+ IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
+ this.registerServiceImplementation(
+ INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(
+ elasticSearch7Client,
+ config.getResultWindowMaxSize()
+ ));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
- this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
- this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
+ this.registerServiceImplementation(
+ ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
+ this.registerServiceImplementation(
+ IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
+ this.registerServiceImplementation(
+ IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
- this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
- this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
+ this.registerServiceImplementation(
+ IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(
+ elasticSearch7Client,
+ config.getProfileTaskQueryMaxSize()
+ ));
+ this.registerServiceImplementation(
+ IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(
+ elasticSearch7Client,
+ config.getProfileTaskQueryMaxSize()
+ ));
+ this.registerServiceImplementation(
+ IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(
+ elasticSearch7Client,
+ config.getProfileTaskQueryMaxSize()
+ ));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
index cd2003b..87b62d8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
@@ -18,9 +18,19 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
@@ -55,23 +65,21 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
public class ElasticSearch7Client extends ElasticSearchClient {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Client.class);
- public ElasticSearch7Client(final String clusterNodes, final String protocol, final String trustStorePath,
- final String trustStorePass, final String namespace, final String user, final String password) {
- super(clusterNodes, protocol, trustStorePath, trustStorePass, namespace, user, password);
+ public ElasticSearch7Client(final String clusterNodes,
+ final String protocol,
+ final String trustStorePath,
+ final String trustStorePass,
+ final String user,
+ final String password,
+ List<IndexNameConverter> indexNameConverters) {
+ super(
+ clusterNodes, protocol, trustStorePath, trustStorePass, user, password,
+ indexNameConverters
+ );
}
@Override
@@ -91,7 +99,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public boolean createIndex(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
+ Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
@@ -138,11 +146,13 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public boolean createTemplate(String indexName, Map<String, Object> settings,
- Map<String, Object> mapping) throws IOException {
+ Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
- PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(Collections.singletonList(indexName + "-*"))
- .alias(new Alias(indexName))
+ PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(
+ Collections.singletonList(indexName + "-*"))
+ .alias(new Alias(
+ indexName))
.settings(settings)
.mapping(mapping);
@@ -157,7 +167,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(indexName);
AcknowledgedResponse acknowledgedResponse = client.indices()
- .deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
+ .deleteTemplate(
+ deleteIndexTemplateRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
@@ -190,8 +201,9 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long seqNo,
- long primaryTerm) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+ long primaryTerm) throws IOException {
+ org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+ indexName, id, source);
request.setIfSeqNo(seqNo);
request.setIfPrimaryTerm(primaryTerm);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -199,7 +211,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+ org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+ indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request, RequestOptions.DEFAULT);
}
@@ -221,7 +234,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
deleteByQueryRequest.setAbortOnVersionConflict(false);
deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
- logger.debug("delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest, bulkByScrollResponse);
+ logger.debug(
+ "delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest,
+ bulkByScrollResponse
+ );
return HttpStatus.SC_OK;
}
@@ -241,7 +257,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();
- return BulkProcessor.builder((bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener), listener)
+ return BulkProcessor.builder(
+ (bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT,
+ bulkResponseActionListener
+ ), listener)
.setBulkActions(bulkActions)
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)