You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/03/21 22:03:55 UTC
[incubator-skywalking] branch master updated: Support ES type/table
namespace feature (#956)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 0e60d40 Support ES type/table namespace feature (#956)
0e60d40 is described below
commit 0e60d40d0121c5d57b6fe1c51dbddb982daf44c7
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 22 06:03:51 2018 +0800
Support ES type/table namespace feature (#956)
* Proposal the code structure for table name prefix. Related to #932
* Fix style check.
* Keep ESDao table name in control.
* Finish all codes about ES namespace feature.
---
.../client/elasticsearch/ElasticSearchClient.java | 96 +++++++++---
.../collector-storage-es-provider/pom.xml | 5 +
.../storage/es/StorageModuleEsProvider.java | 162 +++++----------------
.../es/base/dao/AbstractPersistenceEsDAO.java | 23 +--
.../es/dao/GlobalTraceEsPersistenceDAO.java | 21 ++-
.../es/dao/SegmentDurationEsPersistenceDAO.java | 21 ++-
.../storage/es/dao/SegmentEsPersistenceDAO.java | 21 ++-
.../alarm/ApplicationAlarmEsPersistenceDAO.java | 35 +++--
.../ApplicationReferenceAlarmEsPersistenceDAO.java | 33 +++--
...licationReferenceAlarmListEsPersistenceDAO.java | 33 +++--
.../dao/alarm/InstanceAlarmEsPersistenceDAO.java | 33 +++--
.../alarm/InstanceAlarmListEsPersistenceDAO.java | 33 +++--
.../InstanceReferenceAlarmEsPersistenceDAO.java | 37 +++--
...InstanceReferenceAlarmListEsPersistenceDAO.java | 37 +++--
.../es/dao/alarm/ServiceAlarmEsPersistenceDAO.java | 35 +++--
.../alarm/ServiceAlarmListEsPersistenceDAO.java | 35 +++--
.../ServiceReferenceAlarmEsPersistenceDAO.java | 45 +++---
.../ServiceReferenceAlarmListEsPersistenceDAO.java | 45 +++---
.../AbstractMemoryPoolMetricEsPersistenceDAO.java | 32 ++--
.../es/dao/register/InstanceRegisterEsDAO.java | 32 ++--
.../storage/es/dao/ui/CpuMetricEsUIDAO.java | 25 ++--
.../storage/es/dao/ui/GCMetricEsUIDAO.java | 23 +--
.../storage/es/dao/ui/InstanceMetricEsUIDAO.java | 46 +++---
.../storage/es/dao/ui/MemoryMetricEsUIDAO.java | 33 +++--
.../storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java | 44 ++----
.../storage/es/dao/ui/ServiceMetricEsUIDAO.java | 80 +++++-----
26 files changed, 554 insertions(+), 511 deletions(-)
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index f639f81..d58b955 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -19,12 +19,9 @@
package org.apache.skywalking.apm.collector.client.elasticsearch;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
+import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
@@ -33,19 +30,25 @@ import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.apache.skywalking.apm.collector.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+
/**
* @author peng-yongsheng
*/
@@ -55,37 +58,42 @@ public class ElasticSearchClient implements Client {
private org.elasticsearch.client.Client client;
+ private final String namespace;
+
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final String clusterNodes;
- public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
+ public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
+ this.namespace = namespace;
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
}
- @Override public void initialize() throws ClientException {
+ @Override
+ public void initialize() throws ClientException {
Settings settings = Settings.builder()
- .put("cluster.name", clusterName)
- .put("client.transport.sniff", clusterTransportSniffer)
- .build();
+ .put("cluster.name", clusterName)
+ .put("client.transport.sniff", clusterTransportSniffer)
+ .build();
client = new PreBuiltTransportClient(settings);
List<AddressPairs> pairsList = parseClusterNodes(clusterNodes);
for (AddressPairs pairs : pairsList) {
try {
- ((PreBuiltTransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
+ ((PreBuiltTransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
} catch (UnknownHostException e) {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
}
- @Override public void shutdown() {
+ @Override
+ public void shutdown() {
}
@@ -114,12 +122,14 @@ public class ElasticSearchClient implements Client {
public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
IndicesAdminClient adminClient = client.admin().indices();
+ indexName = formatIndexName(indexName);
CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
logger.info("create {} index with type of {} finished, isAcknowledged: {}", indexName, indexType, response.isAcknowledged());
return response.isShardsAcked();
}
public boolean deleteIndex(String indexName) {
+ indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
@@ -127,44 +137,84 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
+ indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
return response.isExists();
}
public SearchRequestBuilder prepareSearch(String indexName) {
+ indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
}
- public DeleteByQueryRequestBuilder prepareDelete() {
- return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
+ public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
+ indexName = formatIndexName(indexName);
+ return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
}
- public MultiGetRequestBuilder prepareMultiGet() {
- return client.prepareMultiGet();
+ public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
+ MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
+ rowHandler.setPrepareMultiGet(prepareMultiGet);
+ rowHandler.setNamespace(namespace);
+
+ rows.forEach(row -> {
+ rowHandler.accept(row);
+ });
+
+ return rowHandler.getPrepareMultiGet();
}
+ public abstract static class MultiGetRowHandler<T> implements Consumer<T> {
+ private MultiGetRequestBuilder prepareMultiGet;
+ private String namespace;
+
+ public void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet) {
+ this.prepareMultiGet = prepareMultiGet;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public void add(String indexName, @Nullable String type, String id) {
+ indexName = formatIndexName(namespace, indexName);
+ prepareMultiGet = prepareMultiGet.add(indexName, type, id);
+ }
+
+ private MultiGetRequestBuilder getPrepareMultiGet() {
+ return prepareMultiGet;
+ }
+ }
+
+
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
- public void update(UpdateRequest updateRequest) {
- try {
- client.update(updateRequest).get();
- } catch (InterruptedException | ExecutionException e) {
- logger.error(e.getMessage(), e);
+ private String formatIndexName(String indexName) {
+ return formatIndexName(this.namespace, indexName);
+ }
+
+ private static String formatIndexName(String namespace, String indexName) {
+ if (StringUtils.isNotEmpty(namespace)) {
+ return namespace + "_" + indexName;
}
+ return indexName;
}
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml b/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
index efd4772..0aa276a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
@@ -39,5 +39,10 @@
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>collector-configuration-define</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index 0e9cb27..ff52285 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es;
-import java.util.Properties;
-import java.util.UUID;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
+import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
+import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
@@ -39,21 +39,7 @@ import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponent
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListDayPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListHourPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMinutePersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.alarm.*;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMinuteMetricPersistenceDAO;
@@ -70,16 +56,8 @@ import org.apache.skywalking.apm.collector.storage.dao.cache.IApplicationCacheDA
import org.apache.skywalking.apm.collector.storage.dao.cache.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuSecondMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCSecondMetricPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.cpu.*;
+import org.apache.skywalking.apm.collector.storage.dao.gc.*;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO;
@@ -92,16 +70,8 @@ import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDa
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemorySecondMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolSecondMetricPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.memory.*;
+import org.apache.skywalking.apm.collector.storage.dao.mpool.*;
import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
@@ -114,27 +84,7 @@ import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDay
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmListUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationComponentUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMappingUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationReferenceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IGlobalTraceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO;
+import org.apache.skywalking.apm.collector.storage.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
import org.apache.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO;
@@ -145,21 +95,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationCompone
import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentHourEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentMinuteEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentMonthEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsDayPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsHourPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsMinutePersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationReferenceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceReferenceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceReferenceAlarmListEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.alarm.*;
import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationDayMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationMinuteMetricEsPersistenceDAO;
@@ -176,16 +112,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.cache.ApplicationEsCac
import org.apache.skywalking.apm.collector.storage.es.dao.cache.InstanceEsCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.cache.NetworkAddressEsCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.cache.ServiceNameEsCacheDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuSecondMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCSecondMetricEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.cpu.*;
+import org.apache.skywalking.apm.collector.storage.es.dao.gc.*;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceDayMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceMinuteMetricEsPersistenceDAO;
@@ -198,16 +126,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReference
import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceMinuteMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemorySecondMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolSecondMetricEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.memory.*;
+import org.apache.skywalking.apm.collector.storage.es.dao.mpool.*;
import org.apache.skywalking.apm.collector.storage.es.dao.register.ApplicationRegisterEsDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.register.InstanceRegisterEsDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.register.NetworkAddressRegisterEsDAO;
@@ -220,30 +140,13 @@ import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceD
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMinuteMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationAlarmListEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationComponentEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationMappingEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationReferenceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.CpuMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.GCMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.GlobalTraceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.MemoryMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.MemoryPoolMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.NetworkAddressEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.SegmentDurationEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.SegmentEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceNameServiceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceReferenceEsMetricUIDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.ui.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.UUID;
+
/**
* @author peng-yongsheng
*/
@@ -262,19 +165,23 @@ public class StorageModuleEsProvider extends ModuleProvider {
private ElasticSearchClient elasticSearchClient;
private DataTTLKeeperTimer deleteTimer;
- @Override public String name() {
+ @Override
+ public String name() {
return NAME;
}
- @Override public Class<? extends Module> module() {
+ @Override
+ public Class<? extends Module> module() {
return StorageModule.class;
}
- @Override public void prepare(Properties config) throws ServiceNotProvidedException {
+ @Override
+ public void prepare(Properties config) throws ServiceNotProvidedException {
String clusterName = config.getProperty(CLUSTER_NAME);
- Boolean clusterTransportSniffer = (Boolean)config.get(CLUSTER_TRANSPORT_SNIFFER);
+ Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
- elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
+ String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
+ elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
@@ -284,9 +191,10 @@ public class StorageModuleEsProvider extends ModuleProvider {
registerAlarmDAO();
}
- @Override public void start(Properties config) throws ServiceNotProvidedException {
- Integer indexShardsNumber = (Integer)config.get(INDEX_SHARDS_NUMBER);
- Integer indexReplicasNumber = (Integer)config.get(INDEX_REPLICAS_NUMBER);
+ @Override
+ public void start(Properties config) throws ServiceNotProvidedException {
+ Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
+ Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER);
try {
elasticSearchClient.initialize();
@@ -304,16 +212,18 @@ public class StorageModuleEsProvider extends ModuleProvider {
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
- Integer beforeDay = (Integer)config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
+ Integer beforeDay = (Integer) config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, uuId + 0, beforeDay);
}
- @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException {
deleteTimer.start();
}
- @Override public String[] requiredModules() {
- return new String[] {ClusterModule.NAME};
+ @Override
+ public String[] requiredModules() {
+ return new String[]{ClusterModule.NAME, ConfigurationModule.NAME};
}
private void registerCacheDAO() throws ServiceNotProvidedException {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index e98d2b5..eb22185 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.base.dao;
-import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -31,6 +30,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -46,7 +47,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract String tableName();
- @Override public final STREAM_DATA get(String id) {
+ @Override
+ public final STREAM_DATA get(String id) {
GetResponse getResponse = getClient().prepareGet(tableName(), id).get();
if (getResponse.isExists()) {
STREAM_DATA streamData = esDataToStreamData(getResponse.getSource());
@@ -59,25 +61,28 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
- @Override public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
+ @Override
+ public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
}
- @Override public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
+ @Override
+ public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
}
protected abstract String timeBucketColumnNameForDelete();
- @Override public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket))
- .source(tableName())
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
+ tableName())
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, tableName());
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
index 2b9f23a..b134636 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -45,15 +46,18 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
super(client);
}
- @Override public GlobalTrace get(String id) {
+ @Override
+ public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
- @Override public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getSegmentId());
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getGlobalTraceId());
@@ -62,13 +66,14 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(GlobalTraceTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ GlobalTraceTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 611666e..363bbc0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
@@ -44,15 +45,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
super(client);
}
- @Override public SegmentDuration get(String id) {
+ @Override
+ public SegmentDuration get(String id) {
return null;
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
return null;
}
- @Override public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
logger.debug("segment cost prepareBatchInsert, getApplicationId: {}", data.getId());
Map<String, Object> source = new HashMap<>();
source.put(SegmentDurationTable.COLUMN_SEGMENT_ID, data.getSegmentId());
@@ -67,13 +71,14 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(SegmentDurationTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ SegmentDurationTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentDurationTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
index 61bb6ad..31cda7e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
@@ -45,15 +46,18 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
super(client);
}
- @Override public Segment get(String id) {
+ @Override
+ public Segment get(String id) {
return null;
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
return null;
}
- @Override public IndexRequestBuilder prepareBatchInsert(Segment data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(Segment data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBinary())));
source.put(SegmentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
@@ -61,13 +65,14 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(SegmentTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ SegmentTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index 4e3b058..d3d5603 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
@@ -34,6 +32,9 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -45,27 +46,29 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
super(client);
}
- @Override public ApplicationAlarm get(String id) {
+ @Override
+ public ApplicationAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationAlarm instanceAlarm = new ApplicationAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- instanceAlarm.setApplicationId(((Number)source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
- instanceAlarm.setSourceValue(((Number)source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarm.setApplicationId(((Number) source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarm.setSourceValue(((Number) source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
- instanceAlarm.setAlarmType(((Number)source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- instanceAlarm.setAlarmContent((String)source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceAlarm.setAlarmType(((Number) source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarm.setAlarmContent((String) source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
- instanceAlarm.setLastTimeBucket(((Number)source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceAlarm.setLastTimeBucket(((Number) source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
@@ -78,7 +81,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
return getClient().prepareIndex(ApplicationAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
@@ -91,13 +95,14 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
return getClient().prepareUpdate(ApplicationAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
index 30cb14b..33c1111 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
super(client);
}
- @Override public ApplicationReferenceAlarm get(String id) {
+ @Override
+ public ApplicationReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarm applicationReferenceAlarm = new ApplicationReferenceAlarm();
applicationReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- applicationReferenceAlarm.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- applicationReferenceAlarm.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- applicationReferenceAlarm.setSourceValue(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ applicationReferenceAlarm.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ applicationReferenceAlarm.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ applicationReferenceAlarm.setSourceValue(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
- applicationReferenceAlarm.setAlarmType(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- applicationReferenceAlarm.setAlarmContent((String)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+ applicationReferenceAlarm.setAlarmType(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ applicationReferenceAlarm.setAlarmContent((String) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
- applicationReferenceAlarm.setLastTimeBucket(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ applicationReferenceAlarm.setLastTimeBucket(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return applicationReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(ApplicationReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(ApplicationReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
index 4b7469c..062c794 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
super(client);
}
- @Override public ApplicationReferenceAlarmList get(String id) {
+ @Override
+ public ApplicationReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarmList applicationReferenceAlarmList = new ApplicationReferenceAlarmList();
applicationReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
- applicationReferenceAlarmList.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- applicationReferenceAlarmList.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- applicationReferenceAlarmList.setSourceValue(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ applicationReferenceAlarmList.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ applicationReferenceAlarmList.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ applicationReferenceAlarmList.setSourceValue(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
- applicationReferenceAlarmList.setAlarmType(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
- applicationReferenceAlarmList.setAlarmContent((String)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+ applicationReferenceAlarmList.setAlarmType(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ applicationReferenceAlarmList.setAlarmContent((String) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
- applicationReferenceAlarmList.setTimeBucket(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ applicationReferenceAlarmList.setTimeBucket(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return applicationReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
return getClient().prepareIndex(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
return getClient().prepareUpdate(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
index 64631c6..cec07f4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
super(client);
}
- @Override public InstanceAlarm get(String id) {
+ @Override
+ public InstanceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarm instanceAlarm = new InstanceAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- instanceAlarm.setApplicationId(((Number)source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
- instanceAlarm.setInstanceId(((Number)source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
- instanceAlarm.setSourceValue(((Number)source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarm.setApplicationId(((Number) source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarm.setInstanceId(((Number) source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+ instanceAlarm.setSourceValue(((Number) source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
- instanceAlarm.setAlarmType(((Number)source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- instanceAlarm.setAlarmContent((String)source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceAlarm.setAlarmType(((Number) source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarm.setAlarmContent((String) source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
- instanceAlarm.setLastTimeBucket(((Number)source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceAlarm.setLastTimeBucket(((Number) source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
return getClient().prepareIndex(InstanceAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
return getClient().prepareUpdate(InstanceAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
index 47cefae..ac350bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
super(client);
}
- @Override public InstanceAlarmList get(String id) {
+ @Override
+ public InstanceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarmList instanceAlarmList = new InstanceAlarmList();
instanceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
- instanceAlarmList.setApplicationId(((Number)source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
- instanceAlarmList.setInstanceId(((Number)source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
- instanceAlarmList.setSourceValue(((Number)source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarmList.setApplicationId(((Number) source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarmList.setInstanceId(((Number) source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+ instanceAlarmList.setSourceValue(((Number) source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
- instanceAlarmList.setAlarmType(((Number)source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
- instanceAlarmList.setAlarmContent((String)source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
+ instanceAlarmList.setAlarmType(((Number) source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarmList.setAlarmContent((String) source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
- instanceAlarmList.setTimeBucket(((Number)source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ instanceAlarmList.setTimeBucket(((Number) source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return instanceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
return getClient().prepareIndex(InstanceAlarmListTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
return getClient().prepareUpdate(InstanceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
index fd12f43..79d21da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
super(client);
}
- @Override public InstanceReferenceAlarm get(String id) {
+ @Override
+ public InstanceReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarm instanceReferenceAlarm = new InstanceReferenceAlarm();
instanceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- instanceReferenceAlarm.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- instanceReferenceAlarm.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- instanceReferenceAlarm.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
- instanceReferenceAlarm.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
- instanceReferenceAlarm.setSourceValue(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceReferenceAlarm.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ instanceReferenceAlarm.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ instanceReferenceAlarm.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ instanceReferenceAlarm.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ instanceReferenceAlarm.setSourceValue(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
- instanceReferenceAlarm.setAlarmType(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- instanceReferenceAlarm.setAlarmContent((String)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceReferenceAlarm.setAlarmType(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceReferenceAlarm.setAlarmContent((String) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
- instanceReferenceAlarm.setLastTimeBucket(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceReferenceAlarm.setLastTimeBucket(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
return getClient().prepareIndex(InstanceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
return getClient().prepareUpdate(InstanceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
index 7738849..2c93cbb 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
super(client);
}
- @Override public InstanceReferenceAlarmList get(String id) {
+ @Override
+ public InstanceReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarmList serviceReferenceAlarmList = new InstanceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
- serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
- serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
- serviceReferenceAlarmList.setSourceValue(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setSourceValue(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
- serviceReferenceAlarmList.setAlarmType(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
- serviceReferenceAlarmList.setAlarmContent((String)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+ serviceReferenceAlarmList.setAlarmType(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarmList.setAlarmContent((String) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
- serviceReferenceAlarmList.setTimeBucket(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceReferenceAlarmList.setTimeBucket(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(InstanceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(InstanceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
index 44d8265..a48a79d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
super(client);
}
- @Override public ServiceAlarm get(String id) {
+ @Override
+ public ServiceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarm serviceAlarm = new ServiceAlarm();
serviceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- serviceAlarm.setApplicationId(((Number)source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
- serviceAlarm.setInstanceId(((Number)source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
- serviceAlarm.setServiceId(((Number)source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
- serviceAlarm.setSourceValue(((Number)source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceAlarm.setApplicationId(((Number) source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ serviceAlarm.setInstanceId(((Number) source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+ serviceAlarm.setServiceId(((Number) source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
+ serviceAlarm.setSourceValue(((Number) source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
- serviceAlarm.setAlarmType(((Number)source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- serviceAlarm.setAlarmContent((String)source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
+ serviceAlarm.setAlarmType(((Number) source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceAlarm.setAlarmContent((String) source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
- serviceAlarm.setLastTimeBucket(((Number)source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ serviceAlarm.setLastTimeBucket(((Number) source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
return getClient().prepareIndex(ServiceAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
return getClient().prepareUpdate(ServiceAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
index 56bf3c8..6cb8175 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
super(client);
}
- @Override public ServiceAlarmList get(String id) {
+ @Override
+ public ServiceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarmList serviceAlarmList = new ServiceAlarmList();
serviceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
- serviceAlarmList.setApplicationId(((Number)source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
- serviceAlarmList.setInstanceId(((Number)source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
- serviceAlarmList.setServiceId(((Number)source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
- serviceAlarmList.setSourceValue(((Number)source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceAlarmList.setApplicationId(((Number) source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+ serviceAlarmList.setInstanceId(((Number) source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+ serviceAlarmList.setServiceId(((Number) source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
+ serviceAlarmList.setSourceValue(((Number) source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
- serviceAlarmList.setAlarmType(((Number)source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
- serviceAlarmList.setAlarmContent((String)source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
+ serviceAlarmList.setAlarmType(((Number) source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceAlarmList.setAlarmContent((String) source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
- serviceAlarmList.setTimeBucket(((Number)source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceAlarmList.setTimeBucket(((Number) source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
return getClient().prepareIndex(ServiceAlarmListTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
return getClient().prepareUpdate(ServiceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
index 6934afc..9400024 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
super(client);
}
- @Override public ServiceReferenceAlarm get(String id) {
+ @Override
+ public ServiceReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarm serviceReferenceAlarm = new ServiceReferenceAlarm();
serviceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
- serviceReferenceAlarm.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- serviceReferenceAlarm.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- serviceReferenceAlarm.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
- serviceReferenceAlarm.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
- serviceReferenceAlarm.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
- serviceReferenceAlarm.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
- serviceReferenceAlarm.setSourceValue(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
- serviceReferenceAlarm.setAlarmType(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
- serviceReferenceAlarm.setAlarmContent((String)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
-
- serviceReferenceAlarm.setLastTimeBucket(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ serviceReferenceAlarm.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarm.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarm.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarm.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarm.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+ serviceReferenceAlarm.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+ serviceReferenceAlarm.setSourceValue(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+
+ serviceReferenceAlarm.setAlarmType(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarm.setAlarmContent((String) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+
+ serviceReferenceAlarm.setLastTimeBucket(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
return getClient().prepareIndex(ServiceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
return getClient().prepareUpdate(ServiceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
index b3a8e4e..3014ca1 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
super(client);
}
- @Override public ServiceReferenceAlarmList get(String id) {
+ @Override
+ public ServiceReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarmList serviceReferenceAlarmList = new ServiceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
- serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
- serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
- serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
- serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
- serviceReferenceAlarmList.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
- serviceReferenceAlarmList.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
- serviceReferenceAlarmList.setSourceValue(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
- serviceReferenceAlarmList.setAlarmType(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
- serviceReferenceAlarmList.setAlarmContent((String)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
-
- serviceReferenceAlarmList.setTimeBucket(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+ serviceReferenceAlarmList.setSourceValue(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+
+ serviceReferenceAlarmList.setAlarmType(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarmList.setAlarmContent((String) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+
+ serviceReferenceAlarmList.setTimeBucket(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(ServiceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(ServiceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
- .filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+ QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 2d10fdf..4741f2e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -34,28 +35,31 @@ public abstract class AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
super(client);
}
- @Override protected final String timeBucketColumnNameForDelete() {
+ @Override
+ protected final String timeBucketColumnNameForDelete() {
return MemoryPoolMetricTable.COLUMN_TIME_BUCKET;
}
- @Override protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
+ @Override
+ protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric();
- memoryPoolMetric.setMetricId((String)source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
+ memoryPoolMetric.setMetricId((String) source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
- memoryPoolMetric.setInstanceId(((Number)source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
- memoryPoolMetric.setPoolType(((Number)source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
+ memoryPoolMetric.setInstanceId(((Number) source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
+ memoryPoolMetric.setPoolType(((Number) source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
- memoryPoolMetric.setInit(((Number)source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
- memoryPoolMetric.setMax(((Number)source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
- memoryPoolMetric.setUsed(((Number)source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
- memoryPoolMetric.setCommitted(((Number)source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
- memoryPoolMetric.setTimes(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
+ memoryPoolMetric.setInit(((Number) source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
+ memoryPoolMetric.setMax(((Number) source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
+ memoryPoolMetric.setUsed(((Number) source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
+ memoryPoolMetric.setCommitted(((Number) source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
+ memoryPoolMetric.setTimes(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
- memoryPoolMetric.setTimeBucket(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
+ memoryPoolMetric.setTimeBucket(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
return memoryPoolMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
+ @Override
+ protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_METRIC_ID, streamData.getMetricId());
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
index 074ec58..93a75dc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.register;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
@@ -28,10 +26,13 @@ import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -43,15 +44,18 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
super(client);
}
- @Override public int getMaxInstanceId() {
+ @Override
+ public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
- @Override public int getMinInstanceId() {
+ @Override
+ public int getMinInstanceId() {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
- @Override public void save(Instance instance) {
+ @Override
+ public void save(Instance instance) {
logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
@@ -69,18 +73,14 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
}
- @Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
- ElasticSearchClient client = getClient();
- UpdateRequest updateRequest = new UpdateRequest();
- updateRequest.index(InstanceTable.TABLE);
- updateRequest.type(InstanceTable.TABLE_TYPE);
- updateRequest.id(String.valueOf(instanceId));
- updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
+ @Override
+ public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
+ UpdateRequestBuilder updateRequestBuilder = getClient().prepareUpdate(InstanceTable.TABLE, String.valueOf(instanceId));
+ updateRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartbeatTime));
+ updateRequestBuilder.setDoc(source);
- updateRequest.doc(source);
- client.update(updateRequest);
+ updateRequestBuilder.get();
}
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
index bd07a48..1d4b1ea 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.LinkedList;
-import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
@@ -32,6 +30,9 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -41,22 +42,26 @@ public class CpuMetricEsUIDAO extends EsDAO implements ICpuMetricUIDAO {
super(client);
}
- @Override public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, CpuMetricTable.TABLE);
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
- prepareMultiGet.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
+ this.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+ }
});
+
List<Integer> cpuTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- double cpuUsed = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
- long times = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
- cpuTrends.add((int)((cpuUsed / times) * 100));
+ double cpuUsed = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
+ long times = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
+ cpuTrends.add((int) ((cpuUsed / times) * 100));
} else {
cpuTrends.add(0);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
index d983bcd..30db5bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
+
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
@@ -42,30 +43,34 @@ public class GCMetricEsUIDAO extends EsDAO implements IGCMetricUIDAO {
super(client);
}
- @Override public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+ @Override
+ public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints, GCPhrase.NEW_VALUE);
}
- @Override public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+ @Override
+ public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints, GCPhrase.OLD_VALUE);
}
private List<Integer> getGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints, int gcPhrase) {
String tableName = TimePyramidTableNameBuilder.build(step, GCMetricTable.TABLE);
- MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet();
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
- youngPrepareMultiGet.add(tableName, GCMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
+ add(tableName, GCMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> gcTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = youngPrepareMultiGet.get();
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
if (itemResponse.getResponse().isExists()) {
- long count = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
- long times = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
- gcTrends.add((int)(count / times));
+ long count = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
+ long times = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
+ gcTrends.add((int) (count / times));
} else {
gcTrends.add(0);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
index 81cde47..dbac89c 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.LinkedList;
-import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
@@ -44,6 +42,9 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -53,8 +54,9 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
super(client);
}
- @Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
- long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
+ @Override
+ public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
+ long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -82,8 +84,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
- long calls = (long)callSum.getValue();
- int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween);
+ long calls = (long) callSum.getValue();
+ int callsPerSec = (int) (secondBetween == 0 ? 0 : calls / secondBetween);
AppServerInfo appServerInfo = new AppServerInfo();
appServerInfo.setId(instanceId);
@@ -103,13 +105,15 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
}
}
- @Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> throughputTrend = new LinkedList<>();
@@ -118,8 +122,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
for (int i = 0; i < multiGetResponse.getResponses().length; i++) {
MultiGetItemResponse response = multiGetResponse.getResponses()[i];
if (response.getResponse().isExists()) {
- long callTimes = ((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- throughputTrend.add((int)(callTimes / durationPoints.get(i).getSecondsBetween()));
+ long callTimes = ((Number) response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ throughputTrend.add((int) (callTimes / durationPoints.get(i).getSecondsBetween()));
} else {
throughputTrend.add(0);
}
@@ -127,15 +131,19 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
return throughputTrend;
}
- @Override public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ }
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
});
+
List<Integer> responseTimeTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
index 6e5e891..fb30adf 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
@@ -32,6 +31,8 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -41,38 +42,42 @@ public class MemoryMetricEsUIDAO extends EsDAO implements IMemoryMetricUIDAO {
super(client);
}
- @Override public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+ @Override
+ public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, true);
}
- @Override public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+ @Override
+ public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, false);
}
private Trend getMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints,
- boolean isHeap) {
+ boolean isHeap) {
String tableName = TimePyramidTableNameBuilder.build(step, MemoryMetricTable.TABLE);
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
+ add(tableName, MemoryMetricTable.TABLE_TYPE, id);
+ }
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
- prepareMultiGet.add(tableName, MemoryMetricTable.TABLE_TYPE, id);
});
Trend trend = new Trend();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long max = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
- long used = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
- long times = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
+ long max = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
+ long used = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
+ long times = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
- trend.getMetrics().add((int)(used / times));
+ trend.getMetrics().add((int) (used / times));
if (max < 0) {
- trend.getMaxMetrics().add((int)(used / times));
+ trend.getMaxMetrics().add((int) (used / times));
} else {
- trend.getMaxMetrics().add((int)(max / times));
+ trend.getMaxMetrics().add((int) (max / times));
}
} else {
trend.getMetrics().add(0);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
index 2201b43..5f769b7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
@@ -18,17 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
/**
* @author peng-yongsheng
@@ -39,15 +36,16 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
super(client);
}
- @Override public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
+ @Override
+ public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
GetResponse getResponse = getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get();
JsonObject metric = new JsonObject();
if (getResponse.isExists()) {
- metric.addProperty("max", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
- metric.addProperty("init", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
- metric.addProperty("used", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
+ metric.addProperty("max", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
+ metric.addProperty("init", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
+ metric.addProperty("used", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
} else {
metric.addProperty("max", 0);
metric.addProperty("init", 0);
@@ -56,32 +54,8 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
return metric;
}
- @Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
-
- long timeBucket = startTimeBucket;
- do {
-// timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND, timeBucket, 1);
- String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
- prepareMultiGet.add(MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.TABLE_TYPE, id);
- }
- while (timeBucket <= endTimeBucket);
-
- JsonObject metric = new JsonObject();
- JsonArray usedMetric = new JsonArray();
- MultiGetResponse multiGetResponse = prepareMultiGet.get();
- for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
- if (response.getResponse().isExists()) {
- metric.addProperty("max", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
- metric.addProperty("init", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
- usedMetric.add(((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
- } else {
- metric.addProperty("max", 0);
- metric.addProperty("init", 0);
- usedMetric.add(0);
- }
- }
- metric.add("used", usedMetric);
- return metric;
+ @Override
+ public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
+ throw new UnexpectedException("Not implement methodø");
}
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
index 96761a1..34ae413 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
@@ -18,11 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
@@ -51,6 +46,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
+import java.util.*;
+
/**
* @author peng-yongsheng
*/
@@ -62,23 +59,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<Integer> getServiceResponseTimeTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
- long durationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
- long errorDurationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
- trends.add((int)((durationSum - errorDurationSum) / (calls - errorCalls)));
+ long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+ long durationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
+ long errorDurationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
+ trends.add((int) ((durationSum - errorDurationSum) / (calls - errorCalls)));
} else {
trends.add(0);
}
@@ -86,13 +84,15 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
return trends;
}
- @Override public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
@@ -101,9 +101,9 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
int index = 0;
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long secondBetween = durationPoints.get(index).getSecondsBetween();
- trends.add((int)(calls / secondBetween));
+ trends.add((int) (calls / secondBetween));
} else {
trends.add(0);
}
@@ -112,22 +112,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
return trends;
}
- @Override public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
- trends.add((int)(((calls - errorCalls) / calls)) * 10000);
+ long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+ trends.add((int) (((calls - errorCalls) / calls)) * 10000);
} else {
trends.add(10000);
}
@@ -137,7 +139,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<Node> getServicesMetric(Step step, long startTime, long endTime, MetricSource metricSource,
- Collection<Integer> serviceIds) {
+ Collection<Integer> serviceIds) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -169,8 +171,8 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
ServiceNode serviceNode = new ServiceNode();
serviceNode.setId(serviceId);
- serviceNode.setCalls((long)callsSum.getValue());
- serviceNode.setSla((int)(((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
+ serviceNode.setCalls((long) callsSum.getValue());
+ serviceNode.setSla((int) (((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
nodes.add(serviceNode);
});
return nodes;
@@ -178,7 +180,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
- Integer topN, MetricSource metricSource) {
+ Integer topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -202,12 +204,12 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
Set<Integer> serviceIds = new HashSet<>();
List<ServiceMetric> serviceMetrics = new LinkedList<>();
for (SearchHit searchHit : searchHits) {
- int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
+ int serviceId = ((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
if (!serviceIds.contains(serviceId)) {
ServiceMetric serviceMetric = new ServiceMetric();
serviceMetric.setId(serviceId);
- serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
- serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
+ serviceMetric.setCalls(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
+ serviceMetric.setAvgResponseTime(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
serviceMetrics.add(serviceMetric);
serviceIds.add(serviceId);
--
To stop receiving notification emails like this one, please contact
wusheng@apache.org.