You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by pe...@apache.org on 2018/06/25 13:19:56 UTC
[incubator-skywalking] branch master updated: [Collector] Topology
query tuning, Batch Process instead of bulk. (#1384)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5e03ec8 [Collector] Topology query tuning, Batch Process instead of bulk. (#1384)
5e03ec8 is described below
commit 5e03ec8845105fc2fb56ab680a8d838e108639c4
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Mon Jun 25 21:19:54 2018 +0800
[Collector] Topology query tuning, Batch Process instead of bulk. (#1384)
* #1202
1. Determine the log is enabled for the DEBUG level before printing message.
2. Make the columns initialize to be static attribute.
3. Topology build optimize: Cache query result to avoid repeating queries.
* #1202
Add elasticsearch batch process setting into application.yml.
* #1202
Fixed check style error.
* #1202
Use XContentFactory to build source to insert into elasticsearch.
---
.../handler/ApplicationRegisterServiceHandler.java | 4 +-
.../provider/handler/JVMMetricsServiceHandler.java | 21 ++--
.../NetworkAddressRegisterServiceHandler.java | 5 +-
.../handler/TraceSegmentServiceHandler.java | 5 +-
.../NetworkAddressRegisterServletHandler.java | 19 ++--
.../handler/TraceSegmentServletHandler.java | 20 ++--
.../jvm/provider/service/GCMetricService.java | 6 +-
.../provider/AnalysisMetricModuleProvider.java | 16 +--
.../provider/service/InstanceHeartBeatService.java | 4 +-
.../ApplicationComponentSpanListener.java | 2 +-
.../mapping/ApplicationMappingSpanListener.java | 15 ++-
.../worker/global/GlobalTraceSpanListener.java | 10 +-
.../mapping/InstanceMappingSpanListener.java | 15 ++-
.../segment/SegmentDurationSpanListener.java | 6 +-
.../heartbeat/ServiceNameAggregationWorker.java | 2 +-
.../ServiceNameHeartBeatPersistenceWorker.java | 2 +-
.../service/heartbeat/ServiceNameSpanListener.java | 2 +-
.../ServiceReferenceMetricSpanListener.java | 10 +-
.../register/ApplicationRegisterRemoteWorker.java | 17 ++-
.../register/ApplicationRegisterSerialWorker.java | 16 ++-
.../register/InstanceRegisterRemoteWorker.java | 16 ++-
.../register/InstanceRegisterSerialWorker.java | 16 ++-
.../NetworkAddressRegisterRemoteWorker.java | 17 ++-
.../NetworkAddressRegisterSerialWorker.java | 14 +--
.../register/ServiceNameRegisterSerialWorker.java | 5 +-
.../provider/service/InstanceIDService.java | 10 +-
.../provider/buffer/SegmentBufferManager.java | 5 +-
.../provider/buffer/SegmentBufferReader.java | 11 +-
.../parser/provider/parser/SegmentParse.java | 16 ++-
.../provider/parser/SegmentPersistenceWorker.java | 2 +-
.../parser/standardization/SpanIdExchanger.java | 12 ++-
.../provider/service/SegmentBase64Printer.java | 2 +-
.../base/AbstractLocalAsyncWorkerProvider.java | 2 +-
.../analysis/worker/model/base/AbstractWorker.java | 2 +-
.../worker/model/base/LocalAsyncWorkerRef.java | 8 +-
.../worker/model/base/RemoteWorkerRef.java | 2 +-
.../worker/model/impl/AggregationWorker.java | 21 ++--
.../worker/model/impl/MergePersistenceWorker.java | 14 ++-
.../model/impl/NonMergePersistenceWorker.java | 4 +-
.../worker/model/impl/PersistenceWorker.java | 2 +-
.../analysis/worker/timer/PersistenceTimer.java | 25 +++--
.../src/main/resources/application.yml | 8 +-
.../guava/service/ServiceIdCacheGuavaService.java | 2 +-
.../service/ServiceNameCacheGuavaService.java | 13 ++-
.../client/elasticsearch/ElasticSearchClient.java | 22 ++--
.../apm/collector/core/cache/Window.java | 2 +-
.../apm/collector/core/data/StreamData.java | 16 ++-
.../core/data/operator/NonMergeOperation.java | 1 +
.../apm/collector/instrument/MetricTree.java | 12 +--
.../collector/instrument/ServiceMetricTracing.java | 10 +-
.../instrument/tools/ReportFormatter.java | 27 ++---
.../service/selector/ForeverFirstSelector.java | 4 +-
.../storage/base/dao/IPersistenceDAO.java | 5 +-
.../storage/table/alarm/ApplicationAlarm.java | 5 +-
.../storage/table/alarm/ApplicationAlarmList.java | 5 +-
.../table/alarm/ApplicationReferenceAlarm.java | 5 +-
.../table/alarm/ApplicationReferenceAlarmList.java | 5 +-
.../storage/table/alarm/InstanceAlarm.java | 5 +-
.../storage/table/alarm/InstanceAlarmList.java | 5 +-
.../table/alarm/InstanceReferenceAlarm.java | 5 +-
.../table/alarm/InstanceReferenceAlarmList.java | 5 +-
.../storage/table/alarm/ServiceAlarm.java | 5 +-
.../storage/table/alarm/ServiceAlarmList.java | 5 +-
.../storage/table/alarm/ServiceReferenceAlarm.java | 5 +-
.../table/alarm/ServiceReferenceAlarmList.java | 5 +-
.../table/application/ApplicationComponent.java | 5 +-
.../table/application/ApplicationMapping.java | 5 +-
.../table/application/ApplicationMetric.java | 5 +-
.../application/ApplicationReferenceMetric.java | 5 +-
.../storage/table/global/GlobalTrace.java | 8 +-
.../table/global/ResponseTimeDistribution.java | 5 +-
.../storage/table/instance/InstanceMapping.java | 5 +-
.../storage/table/instance/InstanceMetric.java | 5 +-
.../table/instance/InstanceReferenceMetric.java | 5 +-
.../apm/collector/storage/table/jvm/GCMetric.java | 5 +-
.../collector/storage/table/jvm/MemoryMetric.java | 5 +-
.../storage/table/jvm/MemoryPoolMetric.java | 5 +-
.../storage/table/register/Application.java | 8 +-
.../collector/storage/table/register/Instance.java | 5 +-
.../storage/table/register/NetworkAddress.java | 9 +-
.../storage/table/register/ServiceName.java | 5 +-
.../collector/storage/table/segment/Segment.java | 8 +-
.../storage/table/service/ServiceMetric.java | 5 +-
.../table/service/ServiceReferenceMetric.java | 6 +-
.../collector/storage/es/MetricTransformUtil.java | 41 +++----
.../storage/es/StorageModuleEsConfig.java | 36 +++++++
.../storage/es/StorageModuleEsProvider.java | 4 +-
.../es/base/dao/AbstractPersistenceEsDAO.java | 12 ++-
.../collector/storage/es/base/dao/BatchEsDAO.java | 72 -------------
.../storage/es/base/dao/BatchProcessEsDAO.java | 120 +++++++++++++++++++++
.../es/dao/GlobalTraceEsPersistenceDAO.java | 17 +--
.../es/dao/InstanceHeartBeatEsPersistenceDAO.java | 12 ++-
.../es/dao/SegmentDurationEsPersistenceDAO.java | 30 +++---
.../storage/es/dao/SegmentEsPersistenceDAO.java | 12 ++-
.../dao/ServiceNameHeartBeatEsPersistenceDAO.java | 30 ++++--
...stractApplicationComponentEsPersistenceDAO.java | 21 ++--
...stractApplicationAlarmListEsPersistenceDAO.java | 24 ++---
.../alarm/ApplicationAlarmEsPersistenceDAO.java | 22 ++--
.../ApplicationReferenceAlarmEsPersistenceDAO.java | 24 ++---
...licationReferenceAlarmListEsPersistenceDAO.java | 25 ++---
.../dao/alarm/InstanceAlarmEsPersistenceDAO.java | 24 ++---
.../alarm/InstanceAlarmListEsPersistenceDAO.java | 24 ++---
.../InstanceReferenceAlarmEsPersistenceDAO.java | 28 ++---
...InstanceReferenceAlarmListEsPersistenceDAO.java | 28 ++---
.../es/dao/alarm/ServiceAlarmEsPersistenceDAO.java | 26 ++---
.../alarm/ServiceAlarmListEsPersistenceDAO.java | 26 ++---
.../ServiceReferenceAlarmEsPersistenceDAO.java | 32 +++---
.../ServiceReferenceAlarmListEsPersistenceDAO.java | 32 +++---
.../AbstractApplicationMetricEsPersistenceDAO.java | 25 ++---
...AbstractApplicationMappingEsPersistenceDAO.java | 21 ++--
...ApplicationReferenceMetricEsPersistenceDAO.java | 26 ++---
.../dao/cpu/AbstractCpuMetricEsPersistenceDAO.java | 23 ++--
.../dao/gc/AbstractGCMetricEsPersistenceDAO.java | 25 ++---
.../AbstractInstanceMetricEsPersistenceDAO.java | 17 +--
.../AbstractInstanceMappingEsPersistenceDAO.java | 23 ++--
...actInstanceReferenceMetricEsPersistenceDAO.java | 23 ++--
.../AbstractMemoryMetricEsPersistenceDAO.java | 31 +++---
.../AbstractMemoryPoolMetricEsPersistenceDAO.java | 32 +++---
...ctResponseTimeDistributionEsPersistenceDAO.java | 26 ++---
.../smp/AbstractServiceMetricEsPersistenceDAO.java | 19 ++--
...ractServiceReferenceMetricEsPersistenceDAO.java | 27 ++---
.../es/dao/ui/ApplicationComponentEsUIDAO.java | 2 +-
.../es/dao/ui/ApplicationMappingEsUIDAO.java | 2 +-
.../es/dao/ui/ApplicationMetricEsUIDAO.java | 2 +-
.../dao/ui/ApplicationReferenceMetricEsUIDAO.java | 2 +-
.../storage/es/dao/ui/InstanceEsUIDAO.java | 6 +-
.../collector/storage/h2/base/dao/BatchH2DAO.java | 12 ++-
.../collector/ui/jetty/handler/GraphQLHandler.java | 53 +++------
.../ui/service/ApplicationTopologyService.java | 2 +-
.../ui/service/ClusterTopologyService.java | 2 +-
.../apm/collector/ui/service/TopologyBuilder.java | 50 +++++----
.../collector/ui/service/TopologyBuilderTest.java | 3 +-
132 files changed, 1069 insertions(+), 801 deletions(-)
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
index 9ca2ca2..7d58e57 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
@@ -45,7 +45,9 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
@Override
public void applicationCodeRegister(Application request, StreamObserver<ApplicationMapping> responseObserver) {
- logger.debug("register application");
+ if (logger.isDebugEnabled()) {
+ logger.debug("register application");
+ }
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
String applicationCode = request.getApplicationCode();
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
index 05d728f..54dfb22 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
@@ -21,24 +21,14 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
+import org.apache.skywalking.apm.collector.analysis.jvm.define.service.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
-import org.apache.skywalking.apm.network.proto.CPU;
-import org.apache.skywalking.apm.network.proto.Downstream;
-import org.apache.skywalking.apm.network.proto.GC;
-import org.apache.skywalking.apm.network.proto.JVMMetrics;
-import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
-import org.apache.skywalking.apm.network.proto.Memory;
-import org.apache.skywalking.apm.network.proto.MemoryPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.apm.network.proto.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -63,7 +53,10 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
- logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
+ }
request.getMetricsList().forEach(metric -> {
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getTime());
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
index d925d3d..35d46ba 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
@@ -46,7 +46,10 @@ public class NetworkAddressRegisterServiceHandler extends NetworkAddressRegister
@Override
public void batchRegister(NetworkAddresses request, StreamObserver<NetworkAddressMappings> responseObserver) {
- logger.debug("register application");
+ if (logger.isDebugEnabled()) {
+ logger.debug("register application");
+ }
+
ProtocolStringList addressesList = request.getAddressesList();
NetworkAddressMappings.Builder builder = NetworkAddressMappings.newBuilder();
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
index 4298e79..e8fd6dc 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
@@ -44,7 +44,10 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
- logger.debug("receive segment");
+ if (logger.isDebugEnabled()) {
+ logger.debug("receive segment");
+ }
+
segmentParseService.parse(segment, ISegmentParseService.Source.Agent);
if (debug) {
diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
index 4fc8852..1da81e2 100644
--- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
+++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
@@ -18,19 +18,14 @@
package org.apache.skywalking.apm.collector.agent.jetty.provider.handler;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -52,17 +47,21 @@ public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
return "/networkAddress/register";
}
- @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+ @Override protected JsonElement doGet(HttpServletRequest req) {
throw new UnsupportedOperationException();
}
- @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
+ @Override protected JsonElement doPost(HttpServletRequest req) {
JsonArray responseArray = new JsonArray();
try {
JsonArray networkAddresses = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < networkAddresses.size(); i++) {
String networkAddress = networkAddresses.get(i).getAsString();
- logger.debug("network address register, network address: {}", networkAddress);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("network address register, network address: {}", networkAddress);
+ }
+
int addressId = networkAddressIDService.get(networkAddress);
JsonObject mapping = new JsonObject();
mapping.addProperty(ADDRESS_ID, addressId);
diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
index 169329b..83f9143 100644
--- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
+++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
@@ -20,18 +20,14 @@ package org.apache.skywalking.apm.collector.agent.jetty.provider.handler;
import com.google.gson.JsonElement;
import com.google.gson.stream.JsonReader;
-import java.io.BufferedReader;
-import java.io.IOException;
+import java.io.*;
import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.TraceSegment;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.TraceSegmentJsonReader;
+import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -50,18 +46,22 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
return "/segments";
}
- @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+ @Override protected JsonElement doGet(HttpServletRequest req) {
throw new UnsupportedOperationException();
}
- @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
- logger.debug("receive stream segment");
+ @Override protected JsonElement doPost(HttpServletRequest req) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("receive stream segment");
+ }
+
try {
BufferedReader bufferedReader = req.getReader();
read(bufferedReader);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
+
return null;
}
diff --git a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
index 9bc73d3..38c252f 100644
--- a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
+++ b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
@@ -34,7 +34,7 @@ import static java.util.Objects.isNull;
*/
public class GCMetricService implements IGCMetricService {
- private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
+ private static final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
private Graph<GCMetric> gcMetricGraph;
@@ -59,7 +59,9 @@ public class GCMetricService implements IGCMetricService {
gcMetric.setTimes(1L);
gcMetric.setTimeBucket(timeBucket);
- logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
+ }
getGcMetricGraph().start(gcMetric);
}
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
index 35db2d9..4258a28 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
@@ -100,14 +100,14 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
private void segmentParserListenerRegister() {
ISegmentParserListenerRegister segmentParserListenerRegister = getManager().find(AnalysisSegmentParserModule.NAME).getService(ISegmentParserListenerRegister.class);
- segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory());
- segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory());
- segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory());
- segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory());
- segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory());
- segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory());
- segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory());
- segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory());
+ segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory()); //11000TPS
+ segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory()); //17000TPS
+ segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory()); //22000TPS
+ segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory()); //22000TPS
+ segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory()); //15000TPS
+ segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory()); //13000TPS
+ segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory()); //25000TPS
+ segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory()); //20000TPS
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
index 48bd92f..1409f02 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
@@ -51,7 +51,9 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
- logger.debug("push to instance heart beat persistence worker, id: {}", instance.getId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to instance heart beat persistence worker, id: {}", instance.getId());
+ }
getHeartBeatGraph().start(instance);
}
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
index dc095c5..4511305 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
@@ -36,7 +36,7 @@ import org.apache.skywalking.apm.collector.storage.table.application.Application
public class ApplicationComponentSpanListener implements EntrySpanListener, ExitSpanListener {
private final ApplicationCacheService applicationCacheService;
- private List<ApplicationComponent> applicationComponents = new LinkedList<>();
+ private final List<ApplicationComponent> applicationComponents = new LinkedList<>();
private ApplicationComponentSpanListener(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
index f107ad7..c30a29a 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
@@ -51,7 +51,10 @@ public class ApplicationMappingSpanListener implements EntrySpanListener {
}
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
- logger.debug("application mapping listener parse reference");
+ if (logger.isDebugEnabled()) {
+ logger.debug("application mapping listener parse reference");
+ }
+
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
@@ -73,10 +76,16 @@ public class ApplicationMappingSpanListener implements EntrySpanListener {
}
@Override public void build() {
- logger.debug("application mapping listener build");
+ if (logger.isDebugEnabled()) {
+ logger.debug("application mapping listener build");
+ }
+
Graph<ApplicationMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class);
applicationMappings.forEach(applicationMapping -> {
- logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
+ }
+
graph.start(applicationMapping);
});
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
index 0a48369..655fb4d 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
@@ -37,7 +37,7 @@ public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
private static final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class);
- private List<String> globalTraceIds = new LinkedList<>();
+ private final List<String> globalTraceIds = new LinkedList<>();
private SegmentCoreInfo segmentCoreInfo;
@Override public boolean containsPoint(Point point) {
@@ -58,17 +58,19 @@ public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
}
@Override public void build() {
- logger.debug("global trace listener build");
+ if (logger.isDebugEnabled()) {
+ logger.debug("global trace listener build");
+ }
Graph<GlobalTrace> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
- for (String globalTraceId : globalTraceIds) {
+ globalTraceIds.forEach(globalTraceId -> {
GlobalTrace globalTrace = new GlobalTrace();
globalTrace.setId(segmentCoreInfo.getSegmentId() + Const.ID_SPLIT + globalTraceId);
globalTrace.setTraceId(globalTraceId);
globalTrace.setSegmentId(segmentCoreInfo.getSegmentId());
globalTrace.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
graph.start(globalTrace);
- }
+ });
}
public static class Factory implements SpanListenerFactory {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
index a620870..8213a32 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
@@ -43,7 +43,10 @@ public class InstanceMappingSpanListener implements EntrySpanListener {
}
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
- logger.debug("instance mapping listener parse reference");
+ if (logger.isDebugEnabled()) {
+ logger.debug("instance mapping listener parse reference");
+ }
+
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
InstanceMapping instanceMapping = new InstanceMapping();
@@ -60,10 +63,16 @@ public class InstanceMappingSpanListener implements EntrySpanListener {
}
@Override public void build() {
- logger.debug("instance mapping listener build");
+ if (logger.isDebugEnabled()) {
+ logger.debug("instance mapping listener build");
+ }
+
Graph<InstanceMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID, InstanceMapping.class);
instanceMappings.forEach(instanceMapping -> {
- logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
+ }
+
graph.start(instanceMapping);
});
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
index 30269c4..c9e7b13 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
@@ -76,7 +76,11 @@ public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpan
@Override public void build() {
Graph<SegmentDuration> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SEGMENT_DURATION_GRAPH_ID, SegmentDuration.class);
- logger.debug("segment duration listener build");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("segment duration listener build");
+ }
+
if (entryOperationNameIds.size() == 0) {
segmentDuration.getServiceName().add(serviceNameCacheService.get(firstOperationNameId).getServiceName());
} else {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
index 1079bbc..ed5c7d7 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
@@ -49,7 +49,7 @@ public class ServiceNameAggregationWorker extends AggregationWorker<ServiceName,
}
@Override public int queueSize() {
- return 256;
+ return 4096;
}
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
index 1491c95..2c2b6f1 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
@@ -61,7 +61,7 @@ public class ServiceNameHeartBeatPersistenceWorker extends MergePersistenceWorke
@Override
public int queueSize() {
- return 1024;
+ return 4096;
}
}
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
index 44a8ca6..261f163 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
*/
public class ServiceNameSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
- private List<ServiceName> serviceNames;
+ private final List<ServiceName> serviceNames;
private ServiceNameSpanListener() {
this.serviceNames = new LinkedList<>();
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
index 2d4fbcb..fe03954 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
@@ -148,7 +148,10 @@ public class ServiceReferenceMetricSpanListener implements EntrySpanListener, Ex
}
@Override public void build() {
- logger.debug("service reference listener build");
+ if (logger.isDebugEnabled()) {
+ logger.debug("service reference listener build");
+ }
+
Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class);
entryReferenceMetric.forEach(serviceReferenceMetric -> {
String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
@@ -157,7 +160,10 @@ public class ServiceReferenceMetricSpanListener implements EntrySpanListener, Ex
serviceReferenceMetric.setId(id);
serviceReferenceMetric.setMetricId(metricId);
serviceReferenceMetric.setTimeBucket(minuteTimeBucket);
- logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
+ }
graph.start(serviceReferenceMetric);
});
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
index 6ab9f48..147166e 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -44,8 +40,11 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker<Applic
return WorkerIdDefine.APPLICATION_REGISTER_REMOTE_WORKER;
}
- @Override protected void onWork(Application message) throws WorkerException {
- logger.debug("application code: {}", message.getApplicationCode());
+ @Override protected void onWork(Application message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("application code: {}", message.getApplicationCode());
+ }
+
onNext(message);
}
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
index 4b34ffc..7db65e1 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
@@ -19,19 +19,15 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
-import org.apache.skywalking.apm.collector.core.util.Const;
+import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO;
import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -53,8 +49,10 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
return WorkerIdDefine.APPLICATION_REGISTER_SERIAL_WORKER;
}
- @Override protected void onWork(Application application) throws WorkerException {
- logger.debug("register application, application code: {}", application.getApplicationCode());
+ @Override protected void onWork(Application application) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("register application, application code: {}", application.getApplicationCode());
+ }
int applicationId;
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
index 9d43322..0b9da8a 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -44,8 +40,10 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance,
super(moduleManager);
}
- @Override protected void onWork(Instance instance) throws WorkerException {
- logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
+ @Override protected void onWork(Instance instance) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
+ }
onNext(instance);
}
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
index 1880b2b..8aa02f1 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
@@ -19,19 +19,15 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
-import org.apache.skywalking.apm.collector.core.util.Const;
+import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -53,8 +49,10 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
return WorkerIdDefine.INSTANCE_REGISTER_SERIAL_WORKER;
}
- @Override protected void onWork(Instance instance) throws WorkerException {
- logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
+ @Override protected void onWork(Instance instance) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
+ }
int instanceId;
if (BooleanUtils.valueToBoolean(instance.getIsAddress())) {
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
index 6c720fe..69c6784 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -44,8 +40,11 @@ public class NetworkAddressRegisterRemoteWorker extends AbstractRemoteWorker<Net
return WorkerIdDefine.NETWORK_ADDRESS_REGISTER_REMOTE_WORKER;
}
- @Override protected void onWork(NetworkAddress message) throws WorkerException {
- logger.debug("network address: {}", message.getNetworkAddress());
+ @Override protected void onWork(NetworkAddress message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("network address: {}", message.getNetworkAddress());
+ }
+
onNext(message);
}
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
index 728a581..1b1b97f 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
@@ -19,17 +19,14 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -51,8 +48,11 @@ public class NetworkAddressRegisterSerialWorker extends AbstractLocalAsyncWorker
return WorkerIdDefine.NETWORK_ADDRESS_REGISTER_SERIAL_WORKER;
}
- @Override protected void onWork(NetworkAddress networkAddress) throws WorkerException {
- logger.debug("register network address, address: {}", networkAddress.getNetworkAddress());
+ @Override protected void onWork(NetworkAddress networkAddress) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("register network address, address: {}", networkAddress.getNetworkAddress());
+ }
+
if (networkAddress.getAddressId() == 0) {
int addressId = networkAddressCacheService.getAddressId(networkAddress.getNetworkAddress());
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
index 800a49a..c90ad78 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
@@ -50,7 +50,10 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
}
@Override protected void onWork(ServiceName serviceName) {
- logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
+ }
+
int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getSrcSpanType(), serviceName.getServiceName());
if (serviceId == 0) {
long now = System.currentTimeMillis();
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
index b4cbf98..400a033 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
@@ -73,7 +73,10 @@ public class InstanceIDService implements IInstanceIDService {
}
@Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) {
- logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
+ if (logger.isDebugEnabled()) {
+ logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
+ }
+
int instanceId = getInstanceCacheService().getInstanceIdByAgentUUID(applicationId, agentUUID);
if (instanceId == 0) {
@@ -95,7 +98,10 @@ public class InstanceIDService implements IInstanceIDService {
}
@Override public int getOrCreateByAddressId(int applicationId, int addressId, long registerTime) {
- logger.debug("get or getOrCreate instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
+ if (logger.isDebugEnabled()) {
+ logger.debug("get or getOrCreate instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
+ }
+
int instanceId = getInstanceCacheService().getInstanceIdByAddressId(applicationId, addressId);
if (instanceId == 0) {
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
index 09fc97c..75f5cc6 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
@@ -97,7 +97,10 @@ public enum SegmentBufferManager {
}
private void newDataFile() throws IOException {
- logger.debug("getOrCreate new segment buffer file");
+ if (logger.isDebugEnabled()) {
+ logger.debug("getOrCreate new segment buffer file");
+ }
+
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
index 79cc25e..d1ce1c4 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
@@ -109,7 +109,10 @@ public enum SegmentBufferReader {
}
for (File dataFile : dataFiles) {
- logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath());
+ }
+
OffsetManager.INSTANCE.setReadOffset(dataFile.getName(), 0);
if (!read(dataFile, 0)) {
break;
@@ -137,7 +140,11 @@ public enum SegmentBufferReader {
final int serialized = upstreamSegment.getSerializedSize();
readFileOffset = readFileOffset + CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
- logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length());
+ }
+
OffsetManager.INSTANCE.setReadOffset(readFileOffset);
}
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
index 85acfc0..335cbab 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
@@ -41,7 +41,7 @@ public class SegmentParse {
private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
private final ModuleManager moduleManager;
- private List<SpanListener> spanListeners;
+ private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
@@ -65,14 +65,19 @@ public class SegmentParse {
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
if (!preBuild(traceIds, segmentDecorator)) {
- logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
+ }
if (source.equals(ISegmentParseService.Source.Agent)) {
writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
}
return false;
} else {
- logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
+ if (logger.isDebugEnabled()) {
+ logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
+ }
+
notifyListenerToBuild();
buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
return true;
@@ -167,7 +172,10 @@ public class SegmentParse {
@GraphComputingMetric(name = "/segment/parse/bufferFile/write")
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
- logger.debug("push to segment buffer write worker, id: {}", id);
+ if (logger.isDebugEnabled()) {
+ logger.debug("push to segment buffer write worker, id: {}", id);
+ }
+
SegmentStandardization standardization = new SegmentStandardization(id);
standardization.setUpstreamSegment(upstreamSegment);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.findGraph(GraphIdDefine.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
index 8803e2d..f811a34 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
@@ -59,7 +59,7 @@ public class SegmentPersistenceWorker extends NonMergePersistenceWorker<Segment>
@Override
public int queueSize() {
- return 1024;
+ return 2048;
}
}
}
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
index b3b0b59..0465a9e 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
@@ -62,7 +62,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent());
if (componentId == 0) {
- logger.debug("component: {} in application: {} exchange failed", standardBuilder.getComponent(), applicationId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("component: {} in application: {} exchange failed", standardBuilder.getComponent(), applicationId);
+ }
return false;
} else {
standardBuilder.toBuilder();
@@ -75,7 +77,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int peerId = networkAddressIDService.getOrCreate(standardBuilder.getPeer());
if (peerId == 0) {
- logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
+ }
return false;
} else {
standardBuilder.toBuilder();
@@ -93,7 +97,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getSpanTypeValue(), operationName);
if (operationNameId == 0) {
- logger.debug("service name: {} from application id: {} exchange failed", operationName, applicationId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("service name: {} from application id: {} exchange failed", operationName, applicationId);
+ }
return false;
} else {
standardBuilder.toBuilder();
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
index f41d248..2fdd346 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
@@ -35,7 +35,7 @@ public class SegmentBase64Printer {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBase64Printer.class);
public static void main(String[] args) throws InvalidProtocolBufferException {
- String segmentBase64 = "CgwKCgMXjPKUga3WgBsSvAEIARiF7Jq1nywgp+yatZ8sKlASDAoKAnPAqKD5rNaAGxgBIAIqDjEyNy4wLjAuMTo5MDkyOAJCFC9zZW5kTWVzc2FnZS97Y291bnR9UhQvc2VuZE1lc3NhZ2Uve2NvdW50fTocS2Fma2EvVHJhY2UtdG9waWMtMS9Db25zdW1lclgEYBt6GwoJbXEuYnJva2VyEg4xMjcuMC4wLjE6OTA5MnoZCghtcS50b3BpYxINVHJhY2UtdG9waWMtMRImEP///////////wEY/+uatZ8sILTsmrWfLDD///////////8BUAIYAiAD";
+ String segmentBase64 = "CgoKCJbf2NPCLBAQEiAQ////////////ARiV39jTwiwg2+7Y08IsMNQPWANgARIlCAEYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAIQARif39jTwiwguezY08IsMJTIAkD///////////8BUAFYAmADEicIAxACGJ/f2NPCLCC57NjTwiwwlMgCQP///////////wFQAVgCYAMSJwgEEAMYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAUQBBif39jTwiwguezY08IsMJTIAkD///////////8BUAFYAmADEicIBhAFGJ/f2NPCLCC57NjTwiwwlMgCQP///////////wFQAVgCYAMSJwgHEAYYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAgQBxif39jTwiwg [...]
byte[] binarySegment = Base64.getDecoder().decode(segmentBase64);
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(binarySegment);
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
index 472f4d1..3a62831 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
@@ -39,7 +39,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends QueueData,
workerCreateListener.addWorker(localAsyncWorker);
LocalAsyncWorkerRef<INPUT, OUTPUT> localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker);
- DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, queueSize());
+ DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, 10000);
localAsyncWorkerRef.setQueueEventHandler(dataCarrier);
dataCarrier.consume(localAsyncWorkerRef, 1);
return localAsyncWorkerRef;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
index e3b697b..1c0d9a0 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractWorker<INPUT, OUTPUT> implements NodeProcessor<INPUT, OUTPUT> {
- private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final ModuleManager moduleManager;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
index 7da4828..7dcc19d 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
@@ -18,23 +18,21 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
import org.apache.skywalking.apm.collector.core.data.QueueData;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerRef<INPUT extends QueueData, OUTPUT extends QueueData> extends WorkerRef<INPUT, OUTPUT> implements IConsumer<INPUT> {
- private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
+ private static final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
private DataCarrier<INPUT> dataCarrier;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
index 28e3847..78410c4 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
*/
public class RemoteWorkerRef<INPUT extends RemoteData, OUTPUT extends RemoteData> extends WorkerRef<INPUT, OUTPUT> {
- private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
+ private static final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker;
private final RemoteSenderService remoteSenderService;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
index 93498f8..8639b78 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
@@ -18,22 +18,20 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.MergeDataCache;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends StreamData> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
- private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
- private MergeDataCache<OUTPUT> mergeDataCache;
+ private final MergeDataCache<OUTPUT> mergeDataCache;
private int messageNum;
public AggregationWorker(ModuleManager moduleManager) {
@@ -52,13 +50,10 @@ public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends
messageNum++;
aggregate(output);
- if (messageNum >= 100) {
+ if (messageNum >= 1000 || message.getEndOfBatchContext().isEndOfBatch()) {
sendToNext();
messageNum = 0;
}
- if (message.getEndOfBatchContext().isEndOfBatch()) {
- sendToNext();
- }
}
private void sendToNext() throws WorkerException {
@@ -70,8 +65,12 @@ public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends
throw new WorkerException(e.getMessage(), e);
}
}
+
mergeDataCache.getLast().collection().forEach((String id, OUTPUT data) -> {
- logger.debug(data.toString());
+ if (logger.isDebugEnabled()) {
+ logger.debug(data.toString());
+ }
+
onNext(data);
});
mergeDataCache.finishReadingLast();
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
index 2fc7d80..ae4a14e 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
@@ -32,7 +32,7 @@ import static java.util.Objects.nonNull;
*/
public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData> extends PersistenceWorker<INPUT_AND_OUTPUT, MergeDataCollection<INPUT_AND_OUTPUT>> {
- private final Logger logger = LoggerFactory.getLogger(MergePersistenceWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(MergePersistenceWorker.class);
private final MergeDataCache<INPUT_AND_OUTPUT> mergeDataCache;
@@ -46,22 +46,21 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
}
@Override protected List<Object> prepareBatch(MergeDataCollection<INPUT_AND_OUTPUT> collection) {
- List<Object> insertBatchCollection = new LinkedList<>();
- List<Object> updateBatchCollection = new LinkedList<>();
+ List<Object> batchCollection = new LinkedList<>();
collection.collection().forEach((id, data) -> {
if (needMergeDBData()) {
INPUT_AND_OUTPUT dbData = persistenceDAO().get(id);
if (nonNull(dbData)) {
dbData.mergeAndFormulaCalculateData(data);
try {
- updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(dbData));
+ batchCollection.add(persistenceDAO().prepareBatchUpdate(dbData));
onNext(dbData);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} else {
try {
- insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
+ batchCollection.add(persistenceDAO().prepareBatchInsert(data));
onNext(data);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
@@ -69,7 +68,7 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
}
} else {
try {
- insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
+ batchCollection.add(persistenceDAO().prepareBatchInsert(data));
onNext(data);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
@@ -77,8 +76,7 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
}
});
- insertBatchCollection.addAll(updateBatchCollection);
- return insertBatchCollection;
+ return batchCollection;
}
@Override protected void cacheData(INPUT_AND_OUTPUT input) {
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
index 86e3eac..767e092 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
@@ -30,7 +30,7 @@ import org.slf4j.*;
*/
public abstract class NonMergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData> extends PersistenceWorker<INPUT_AND_OUTPUT, NonMergeDataCollection<INPUT_AND_OUTPUT>> {
- private final Logger logger = LoggerFactory.getLogger(NonMergePersistenceWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(NonMergePersistenceWorker.class);
private final NonMergeDataCache<INPUT_AND_OUTPUT> mergeDataCache;
@@ -50,7 +50,7 @@ public abstract class NonMergePersistenceWorker<INPUT_AND_OUTPUT extends StreamD
}
@Override protected List<Object> prepareBatch(NonMergeDataCollection<INPUT_AND_OUTPUT> collection) {
- List<Object> insertBatchCollection = new LinkedList<>();
+ List<Object> insertBatchCollection = new ArrayList<>(collection.collection().size());
collection.collection().forEach(data -> {
try {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
index 056f2cc..f06efa9 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
@@ -36,7 +36,7 @@ import org.slf4j.*;
*/
public abstract class PersistenceWorker<INPUT_AND_OUTPUT extends StreamData, COLLECTION extends Collection> extends AbstractLocalAsyncWorker<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT> {
- private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private final IBatchDAO batchDAO;
private final int blockBatchPersistenceSize;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
index 7aac165..e37deb6 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
@@ -62,29 +62,42 @@ public enum PersistenceTimer {
@SuppressWarnings("unchecked")
private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
- logger.debug("Extract data and save");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Extract data and save");
+ }
+
long startTime = System.currentTimeMillis();
try {
List batchAllCollection = new LinkedList();
persistenceWorkers.forEach((PersistenceWorker worker) -> {
- logger.debug("extract {} worker data and save", worker.getClass().getName());
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data and save", worker.getClass().getName());
+ }
+
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
- logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+ }
batchAllCollection.addAll(batchCollection);
}
});
+ if (debug) {
+ logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ }
batchDAO.batchPersistence(batchAllCollection);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} finally {
- logger.debug("persistence data save finish");
+ if (logger.isDebugEnabled()) {
+ logger.debug("persistence data save finish");
+ }
}
if (debug) {
- long endTime = System.currentTimeMillis();
- logger.info("batch persistence duration: {} ms", endTime - startTime);
+ logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
}
diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml
index 05f9e61..7732967 100644
--- a/apm-collector/apm-collector-boot/src/main/resources/application.yml
+++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml
@@ -76,6 +76,11 @@ storage:
indexShardsNumber: 2
indexReplicasNumber: 0
highPerformanceMode: true
+ # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+ bulkActions: 2000 # Execute the bulk every 2000 requests
+ bulkSize: 20 # flush the bulk every 20mb
+ flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
+ concurrentRequests: 2 # the number of concurrent requests
# Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
traceDataTTL: 90 # Unit is minute
minuteMetricDataTTL: 90 # Unit is minute
@@ -106,5 +111,4 @@ configuration:
# default:
# host: localhost
# port: 9411
-# contextPath: /
-#
\ No newline at end of file
+# contextPath: /
\ No newline at end of file
diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
index 3cae234..95226c2 100644
--- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
+++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
@@ -37,7 +37,7 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
private final Logger logger = LoggerFactory.getLogger(ServiceIdCacheGuavaService.class);
- private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
+ private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(1000000).build();
private final ModuleManager moduleManager;
private IServiceNameCacheDAO serviceNameCacheDAO;
diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
index c587815..37deb09 100644
--- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
+++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
@@ -18,18 +18,15 @@
package org.apache.skywalking.apm.collector.cache.guava.service;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.*;
import org.apache.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
+import static java.util.Objects.*;
/**
* @author peng-yongsheng
@@ -38,7 +35,7 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameCacheGuavaService.class);
- private final Cache<Integer, ServiceName> serviceCache = CacheBuilder.newBuilder().maximumSize(10000).build();
+ private final Cache<Integer, ServiceName> serviceCache = CacheBuilder.newBuilder().maximumSize(1000000).build();
private final ModuleManager moduleManager;
private IServiceNameCacheDAO serviceNameCacheDAO;
@@ -66,6 +63,8 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
serviceName = getServiceNameCacheDAO().get(serviceId);
if (nonNull(serviceName)) {
serviceCache.put(serviceId, serviceName);
+ } else {
+ logger.warn("Service id {} is not in cache and persistent storage.", serviceId);
}
}
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index 8499a14..20a982c 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -18,24 +18,18 @@
package org.apache.skywalking.apm.collector.client.elasticsearch;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
+import java.net.*;
+import java.util.*;
import java.util.function.Consumer;
-import org.apache.skywalking.apm.collector.client.Client;
-import org.apache.skywalking.apm.collector.client.ClientException;
-import org.apache.skywalking.apm.collector.client.NameSpace;
+import org.apache.skywalking.apm.collector.client.*;
import org.apache.skywalking.apm.collector.core.data.CommonTable;
-import org.apache.skywalking.apm.collector.core.util.Const;
-import org.apache.skywalking.apm.collector.core.util.StringUtils;
+import org.apache.skywalking.apm.collector.core.util.*;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
+import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
@@ -45,11 +39,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.reindex.DeleteByQueryAction;
-import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
+import org.elasticsearch.index.reindex.*;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
index 5e8dd4d..9790526 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
@@ -67,7 +67,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
}
}
- protected WINDOW_COLLECTION getCurrent() {
+ private WINDOW_COLLECTION getCurrent() {
return pointer;
}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
index ece2da9..0ee3a6b 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
@@ -26,6 +26,16 @@ import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
*/
public abstract class StreamData extends AbstractData implements QueueData {
+ private static final ByteColumn[] BYTE_COLUMNS = {};
+
+ private static final StringListColumn[] STRING_LIST_COLUMNS = {};
+
+ private static final LongListColumn[] LONG_LIST_COLUMNS = {};
+
+ private static final IntegerListColumn[] INTEGER_LIST_COLUMNS = {};
+
+ private static final DoubleListColumn[] DOUBLE_LIST_COLUMNS = {};
+
private EndOfBatchContext endOfBatchContext;
@Override public final EndOfBatchContext getEndOfBatchContext() {
@@ -41,18 +51,18 @@ public abstract class StreamData extends AbstractData implements QueueData {
DoubleColumn[] doubleColumns, StringListColumn[] stringListColumns,
LongListColumn[] longListColumns,
IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
- super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], stringListColumns, longListColumns, integerListColumns, doubleListColumns);
+ super(stringColumns, longColumns, integerColumns, doubleColumns, BYTE_COLUMNS, stringListColumns, longListColumns, integerListColumns, doubleListColumns);
}
public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns) {
- super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+ super(stringColumns, longColumns, integerColumns, doubleColumns, BYTE_COLUMNS, STRING_LIST_COLUMNS, LONG_LIST_COLUMNS, INTEGER_LIST_COLUMNS, DOUBLE_LIST_COLUMNS);
}
public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns,
ByteColumn[] byteColumns) {
- super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+ super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, STRING_LIST_COLUMNS, LONG_LIST_COLUMNS, INTEGER_LIST_COLUMNS, DOUBLE_LIST_COLUMNS);
}
@Override public final String selectKey() {
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
index d62d11f..8a5f017 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.core.data.*;
* @author peng-yongsheng
*/
public class NonMergeOperation implements MergeOperation {
+
@Override public String operate(String newValue, String oldValue) {
return oldValue;
}
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
index 63e2e6d..1500b84 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
@@ -20,14 +20,10 @@ package org.apache.skywalking.apm.collector.instrument;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author wusheng, peng-yongsheng
@@ -61,8 +57,8 @@ public enum MetricTree implements Runnable {
logBuffer.append("##################################################################################################################").append(lineSeparator);
logBuffer.append("# Collector Service Report #").append(lineSeparator);
logBuffer.append("##################################################################################################################").append(lineSeparator);
- metrics.forEach((MetricNode metric) -> metric.toOutput(new ReportWriter() {
+ metrics.forEach((MetricNode metric) -> metric.toOutput(new ReportWriter() {
@Override public void writeMetricName(String name) {
logBuffer.append(name).append(lineSeparator);
}
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
index 29b6af9..add628e 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
@@ -19,19 +19,15 @@
package org.apache.skywalking.apm.collector.instrument;
import java.lang.reflect.Method;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import net.bytebuddy.implementation.bind.annotation.AllArguments;
-import net.bytebuddy.implementation.bind.annotation.Origin;
-import net.bytebuddy.implementation.bind.annotation.RuntimeType;
-import net.bytebuddy.implementation.bind.annotation.SuperCall;
-import net.bytebuddy.implementation.bind.annotation.This;
+import java.util.concurrent.*;
+import net.bytebuddy.implementation.bind.annotation.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
/**
* @author wu-sheng
*/
public class ServiceMetricTracing {
+
private volatile ConcurrentHashMap<Method, ServiceMetric> metrics = new ConcurrentHashMap<>();
ServiceMetricTracing() {
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
index 595b5f1..7489510 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
@@ -18,10 +18,8 @@
package org.apache.skywalking.apm.collector.instrument.tools;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -36,20 +34,13 @@ class ReportFormatter {
logger.info(System.lineSeparator() + "Formatted report: ");
report.getMetrics().forEach(metric -> {
- String[] subMetricNames = metric.getMetricName().split("/");
-
- String metricName = "";
- for (String subMetricName : subMetricNames) {
- if (subMetricName != null && !subMetricName.equals("")) {
- metricName = metricName + "/" + subMetricName;
-
- if (!metricMap.containsKey(metricName)) {
- Metric newMetric = new Metric();
- newMetric.setMetricName(metricName);
- metricMap.put(metricName, newMetric);
- }
- metricMap.get(metricName).merge(metric);
- }
+ if (metricMap.containsKey(metric.getMetricName())) {
+ Metric existMetric = metricMap.get(metric.getMetricName());
+ existMetric.setTotal(existMetric.getTotal() + metric.getTotal());
+ existMetric.setCalls(existMetric.getCalls() + metric.getCalls());
+ existMetric.setAvg(existMetric.getTotal() / existMetric.getCalls());
+ } else {
+ metricMap.put(metric.getMetricName(), metric);
}
});
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
index 98460f9..86d0a6e 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
@@ -32,7 +32,9 @@ public class ForeverFirstSelector implements RemoteClientSelector {
private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public RemoteClient select(List<RemoteClient> clients, RemoteData remoteData) {
- logger.debug("clients size: {}", clients.size());
+ if (logger.isDebugEnabled()) {
+ logger.debug("clients size: {}", clients.size());
+ }
return clients.get(0);
}
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
index 723e757..fcf1af4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.collector.storage.base.dao;
+import java.io.IOException;
import org.apache.skywalking.apm.collector.core.data.StreamData;
/**
@@ -27,9 +28,9 @@ public interface IPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends StreamData>
STREAM_DATA get(String id);
- INSERT prepareBatchInsert(STREAM_DATA data);
+ INSERT prepareBatchInsert(STREAM_DATA data) throws IOException;
- UPDATE prepareBatchUpdate(STREAM_DATA data);
+ UPDATE prepareBatchUpdate(STREAM_DATA data) throws IOException;
void deleteHistory(Long timeBucketBefore);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
index af918f3..78a8449 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
@@ -43,8 +43,11 @@ public class ApplicationAlarm extends StreamData implements Alarm {
new IntegerColumn(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
index d9627cc..9c4aaad 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
@@ -44,8 +44,11 @@ public class ApplicationAlarmList extends StreamData {
new IntegerColumn(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
index 8485720..a48b3a5 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
@@ -44,8 +44,11 @@ public class ApplicationReferenceAlarm extends StreamData implements Alarm {
new IntegerColumn(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationReferenceAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
index 6ff140a..2349761 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
@@ -44,8 +44,11 @@ public class ApplicationReferenceAlarmList extends StreamData {
new IntegerColumn(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationReferenceAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
index cd3ab1d..1643bdc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
@@ -44,8 +44,11 @@ public class InstanceAlarm extends StreamData implements Alarm {
new IntegerColumn(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
index ef8f4bc..7acc3bc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
@@ -44,8 +44,11 @@ public class InstanceAlarmList extends StreamData {
new IntegerColumn(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
index ca4a883..d94b97d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
@@ -46,8 +46,11 @@ public class InstanceReferenceAlarm extends StreamData implements Alarm {
new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceReferenceAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
index 4814a0b..c6e38da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
@@ -46,8 +46,11 @@ public class InstanceReferenceAlarmList extends StreamData {
new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceReferenceAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
index f3dc1dc..12220f6 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
@@ -45,8 +45,11 @@ public class ServiceAlarm extends StreamData implements Alarm {
new IntegerColumn(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
index 98bb5aa..f60cc49 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
@@ -45,8 +45,11 @@ public class ServiceAlarmList extends StreamData {
new IntegerColumn(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
index b4483f8..1d070d4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
@@ -48,8 +48,11 @@ public class ServiceReferenceAlarm extends StreamData implements Alarm {
new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceReferenceAlarm() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
index d42b240..ead2316 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
@@ -48,8 +48,11 @@ public class ServiceReferenceAlarmList extends StreamData {
new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceReferenceAlarmList() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
index 31c44a3..7bca4bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
@@ -42,8 +42,11 @@ public class ApplicationComponent extends StreamData {
new IntegerColumn(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationComponent() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
index 811b051..cbce18f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
@@ -42,8 +42,11 @@ public class ApplicationMapping extends StreamData {
new IntegerColumn(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationMapping() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
index 694ddef..2c16f27 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
@@ -62,8 +62,11 @@ public class ApplicationMetric extends StreamData implements Metric {
new IntegerColumn(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
index e0f994d..ec7ef4d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
@@ -64,8 +64,11 @@ public class ApplicationReferenceMetric extends StreamData implements Metric {
new IntegerColumn(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ApplicationReferenceMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
index 44bb44c..7acc38d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
@@ -37,8 +37,14 @@ public class GlobalTrace extends StreamData {
new LongColumn(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
};
+ private static final IntegerColumn[] INTEGER_COLUMNS = {
+ };
+
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public GlobalTrace() {
- super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
index 1974509..688899e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
@@ -44,8 +44,11 @@ public class ResponseTimeDistribution extends StreamData {
new IntegerColumn(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ResponseTimeDistribution() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
index 5183663..3982b24 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
@@ -43,8 +43,11 @@ public class InstanceMapping extends StreamData {
new IntegerColumn(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceMapping() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
index 8ffe0d8..0d5850e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
@@ -60,8 +60,11 @@ public class InstanceMetric extends StreamData implements Metric {
new IntegerColumn(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
index 822d8e7..471a174 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
@@ -62,8 +62,11 @@ public class InstanceReferenceMetric extends StreamData implements Metric {
new IntegerColumn(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public InstanceReferenceMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
index 76d4cc6..ec50077 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
@@ -44,8 +44,11 @@ public class GCMetric extends StreamData {
new IntegerColumn(GCMetricTable.PHRASE, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public GCMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
index b37514a..9c5e6ea 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
@@ -46,8 +46,11 @@ public class MemoryMetric extends StreamData {
new IntegerColumn(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public MemoryMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
index 5418136..e9caf53 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
@@ -46,8 +46,11 @@ public class MemoryPoolMetric extends StreamData {
new IntegerColumn(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public MemoryPoolMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
index 928d032..38728da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
@@ -39,8 +39,14 @@ public class Application extends StreamData {
new IntegerColumn(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
};
+ private static final LongColumn[] LONG_COLUMNS = {
+ };
+
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public Application() {
- super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
index e8a78cb..78f9a36 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
@@ -47,8 +47,11 @@ public class Instance extends StreamData {
new IntegerColumn(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public Instance() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
index 831f113..5e248b1 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
@@ -39,9 +39,14 @@ public class NetworkAddress extends StreamData {
new IntegerColumn(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
};
- public NetworkAddress() {
- super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+ private static final LongColumn[] LONG_COLUMNS = {
+ };
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
+ public NetworkAddress() {
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
index d04a7f0..47c2c98 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
@@ -44,8 +44,11 @@ public class ServiceName extends StreamData {
new IntegerColumn(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceName() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
index 3a411c7..81581c3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
@@ -39,8 +39,14 @@ public class Segment extends StreamData {
new ByteColumn(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
};
+ private static final IntegerColumn[] INTEGER_COLUMNS = {
+ };
+
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public Segment() {
- super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0], BYTE_COLUMNS);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS, BYTE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
index 91cdf74..e7fa9a2 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
@@ -61,8 +61,11 @@ public class ServiceMetric extends StreamData implements Metric {
new IntegerColumn(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
};
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+
public ServiceMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
index e644e02..d51d32a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
@@ -67,9 +67,11 @@ public class ServiceReferenceMetric extends StreamData implements Metric {
new IntegerColumn(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
};
- public ServiceReferenceMetric() {
- super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+ private static final DoubleColumn[] DOUBLE_COLUMNS = {
+ };
+ public ServiceReferenceMetric() {
+ super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
}
@Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
index 708ce12..95efd15 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
@@ -18,9 +18,10 @@
package org.apache.skywalking.apm.collector.storage.es;
+import java.io.IOException;
import java.util.Map;
-import org.apache.skywalking.apm.collector.storage.table.Metric;
-import org.apache.skywalking.apm.collector.storage.table.MetricColumns;
+import org.apache.skywalking.apm.collector.storage.table.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author peng-yongsheng
@@ -51,26 +52,26 @@ public enum MetricTransformUtil {
target.setMqTransactionAverageDuration(((Number)source.get(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName())).longValue());
}
- public void esStreamDataToEsData(Metric source, Map<String, Object> target) {
- target.put(MetricColumns.TIME_BUCKET.getName(), source.getTimeBucket());
- target.put(MetricColumns.SOURCE_VALUE.getName(), source.getSourceValue());
+ public void esStreamDataToEsData(Metric source, XContentBuilder target) throws IOException {
+ target.field(MetricColumns.TIME_BUCKET.getName(), source.getTimeBucket());
+ target.field(MetricColumns.SOURCE_VALUE.getName(), source.getSourceValue());
- target.put(MetricColumns.TRANSACTION_CALLS.getName(), source.getTransactionCalls());
- target.put(MetricColumns.TRANSACTION_ERROR_CALLS.getName(), source.getTransactionErrorCalls());
- target.put(MetricColumns.TRANSACTION_DURATION_SUM.getName(), source.getTransactionDurationSum());
- target.put(MetricColumns.TRANSACTION_ERROR_DURATION_SUM.getName(), source.getTransactionErrorDurationSum());
- target.put(MetricColumns.TRANSACTION_AVERAGE_DURATION.getName(), source.getTransactionAverageDuration());
+ target.field(MetricColumns.TRANSACTION_CALLS.getName(), source.getTransactionCalls());
+ target.field(MetricColumns.TRANSACTION_ERROR_CALLS.getName(), source.getTransactionErrorCalls());
+ target.field(MetricColumns.TRANSACTION_DURATION_SUM.getName(), source.getTransactionDurationSum());
+ target.field(MetricColumns.TRANSACTION_ERROR_DURATION_SUM.getName(), source.getTransactionErrorDurationSum());
+ target.field(MetricColumns.TRANSACTION_AVERAGE_DURATION.getName(), source.getTransactionAverageDuration());
- target.put(MetricColumns.BUSINESS_TRANSACTION_CALLS.getName(), source.getBusinessTransactionCalls());
- target.put(MetricColumns.BUSINESS_TRANSACTION_ERROR_CALLS.getName(), source.getBusinessTransactionErrorCalls());
- target.put(MetricColumns.BUSINESS_TRANSACTION_DURATION_SUM.getName(), source.getBusinessTransactionDurationSum());
- target.put(MetricColumns.BUSINESS_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getBusinessTransactionErrorDurationSum());
- target.put(MetricColumns.BUSINESS_TRANSACTION_AVERAGE_DURATION.getName(), source.getBusinessTransactionAverageDuration());
+ target.field(MetricColumns.BUSINESS_TRANSACTION_CALLS.getName(), source.getBusinessTransactionCalls());
+ target.field(MetricColumns.BUSINESS_TRANSACTION_ERROR_CALLS.getName(), source.getBusinessTransactionErrorCalls());
+ target.field(MetricColumns.BUSINESS_TRANSACTION_DURATION_SUM.getName(), source.getBusinessTransactionDurationSum());
+ target.field(MetricColumns.BUSINESS_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getBusinessTransactionErrorDurationSum());
+ target.field(MetricColumns.BUSINESS_TRANSACTION_AVERAGE_DURATION.getName(), source.getBusinessTransactionAverageDuration());
- target.put(MetricColumns.MQ_TRANSACTION_CALLS.getName(), source.getMqTransactionCalls());
- target.put(MetricColumns.MQ_TRANSACTION_ERROR_CALLS.getName(), source.getMqTransactionErrorCalls());
- target.put(MetricColumns.MQ_TRANSACTION_DURATION_SUM.getName(), source.getMqTransactionDurationSum());
- target.put(MetricColumns.MQ_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getMqTransactionErrorDurationSum());
- target.put(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName(), source.getMqTransactionAverageDuration());
+ target.field(MetricColumns.MQ_TRANSACTION_CALLS.getName(), source.getMqTransactionCalls());
+ target.field(MetricColumns.MQ_TRANSACTION_ERROR_CALLS.getName(), source.getMqTransactionErrorCalls());
+ target.field(MetricColumns.MQ_TRANSACTION_DURATION_SUM.getName(), source.getMqTransactionDurationSum());
+ target.field(MetricColumns.MQ_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getMqTransactionErrorDurationSum());
+ target.field(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName(), source.getMqTransactionAverageDuration());
}
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
index 83b28eb..cdde086 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
@@ -33,6 +33,10 @@ public class StorageModuleEsConfig extends ElasticSearchClientConfig {
private int hourMetricDataTTL = 36;
private int dayMetricDataTTL = 45;
private int monthMetricDataTTL = 18;
+ private int bulkActions = 2000;
+ private int bulkSize = 20;
+ private int flushInterval = 10;
+ private int concurrentRequests = 2;
int getIndexShardsNumber() {
return indexShardsNumber;
@@ -97,4 +101,36 @@ public class StorageModuleEsConfig extends ElasticSearchClientConfig {
void setMonthMetricDataTTL(int monthMetricDataTTL) {
this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL;
}
+
+ public int getBulkActions() {
+ return bulkActions;
+ }
+
+ public void setBulkActions(int bulkActions) {
+ this.bulkActions = bulkActions == 0 ? 2000 : bulkActions;
+ }
+
+ public int getBulkSize() {
+ return bulkSize;
+ }
+
+ public void setBulkSize(int bulkSize) {
+ this.bulkSize = bulkSize == 0 ? 20 : bulkSize;
+ }
+
+ public int getFlushInterval() {
+ return flushInterval;
+ }
+
+ public void setFlushInterval(int flushInterval) {
+ this.flushInterval = flushInterval == 0 ? 10 : flushInterval;
+ }
+
+ public int getConcurrentRequests() {
+ return concurrentRequests;
+ }
+
+ public void setConcurrentRequests(int concurrentRequests) {
+ this.concurrentRequests = concurrentRequests == 0 ? 2 : concurrentRequests;
+ }
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index a71226a..b33000a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -48,7 +48,7 @@ import org.apache.skywalking.apm.collector.storage.dao.rtd.*;
import org.apache.skywalking.apm.collector.storage.dao.smp.*;
import org.apache.skywalking.apm.collector.storage.dao.srmp.*;
import org.apache.skywalking.apm.collector.storage.dao.ui.*;
-import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
+import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchProcessEsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
import org.apache.skywalking.apm.collector.storage.es.dao.*;
import org.apache.skywalking.apm.collector.storage.es.dao.acp.*;
@@ -105,7 +105,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), nameSpace);
this.registerServiceImplementation(ITTLConfigService.class, new TTLConfigService(config));
- this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
registerCacheDAO();
registerRegisterDAO();
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index af6f70d..307cc81 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.collector.storage.es.base.dao;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
@@ -25,6 +26,7 @@ import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.*;
@@ -56,17 +58,17 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
}
}
- protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
+ protected abstract XContentBuilder esStreamDataToEsData(STREAM_DATA streamData) throws IOException;
@Override
- public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
- Map<String, Object> source = esStreamDataToEsData(streamData);
+ public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) throws IOException {
+ XContentBuilder source = esStreamDataToEsData(streamData);
return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
}
@Override
- public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
- Map<String, Object> source = esStreamDataToEsData(streamData);
+ public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) throws IOException {
+ XContentBuilder source = esStreamDataToEsData(streamData);
return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java
deleted file mode 100644
index 2cd5036..0000000
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.collector.storage.es.base.dao;
-
-import java.util.List;
-import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
-import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
-import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequestBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author peng-yongsheng
- */
-public class BatchEsDAO extends EsDAO implements IBatchDAO {
-
- private final Logger logger = LoggerFactory.getLogger(BatchEsDAO.class);
-
- public BatchEsDAO(ElasticSearchClient client) {
- super(client);
- }
-
- @GraphComputingMetric(name = "/persistence/batchPersistence/")
- @Override public void batchPersistence(@BatchParameter List<?> batchCollection) {
- if (logger.isDebugEnabled()) {
- logger.debug("bulk data size: {}", batchCollection.size());
- }
- if (CollectionUtils.isNotEmpty(batchCollection)) {
- BulkRequestBuilder bulkRequest = getClient().prepareBulk();
-
- batchCollection.forEach(builder -> {
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder)builder);
- }
- if (builder instanceof UpdateRequestBuilder) {
- bulkRequest.add((UpdateRequestBuilder)builder);
- }
- });
-
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- if (bulkResponse.hasFailures()) {
- logger.error(bulkResponse.buildFailureMessage());
- for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
- logger.error("Bulk request failure, index: {}, id: {}", itemResponse.getIndex(), itemResponse.getId());
- }
- }
- }
- }
-}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java
new file mode 100644
index 0000000..d3af889
--- /dev/null
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.storage.es.base.dao;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.apm.collector.core.UnexpectedException;
+import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
+import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
+import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
+
+ private BulkProcessor bulkProcessor;
+ private final int bulkActions;
+ private final int bulkSize;
+ private final int flushInterval;
+ private final int concurrentRequests;
+
+ public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+ int concurrentRequests) {
+ super(client);
+ this.bulkActions = bulkActions;
+ this.bulkSize = bulkSize;
+ this.flushInterval = flushInterval;
+ this.concurrentRequests = concurrentRequests;
+ }
+
+ @GraphComputingMetric(name = "/persistence/batchPersistence/")
+ @Override public void batchPersistence(List<?> batchCollection) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = createBulkProcessor();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("bulk data size: {}", batchCollection.size());
+ }
+
+ if (CollectionUtils.isNotEmpty(batchCollection)) {
+ batchCollection.forEach(builder -> {
+ if (builder instanceof IndexRequestBuilder) {
+ this.bulkProcessor.add(((IndexRequestBuilder)builder).request());
+ }
+ if (builder instanceof UpdateRequestBuilder) {
+ this.bulkProcessor.add(((UpdateRequestBuilder)builder).request());
+ }
+ });
+ }
+ }
+
+ private BulkProcessor createBulkProcessor() {
+ ElasticSearchClient elasticSearchClient = getClient();
+
+ Client client;
+ try {
+ Field field = elasticSearchClient.getClass().getDeclaredField("client");
+ field.setAccessible(true);
+ client = (Client)field.get(elasticSearchClient);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new UnexpectedException(e.getMessage());
+ }
+
+ return BulkProcessor.builder(
+ client,
+ new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId,
+ BulkRequest request) {
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ BulkResponse response) {
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ Throwable failure) {
+ logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+ }
+ })
+ .setBulkActions(bulkActions)
+ .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
+ .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
+ .setConcurrentRequests(concurrentRequests)
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
+ .build();
+ }
+}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
index 1ac1490..4dc8428 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
@@ -18,7 +18,8 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
@@ -26,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersisten
import org.apache.skywalking.apm.collector.storage.table.global.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -48,12 +50,13 @@ public class GlobalTraceEsPersistenceDAO extends AbstractPersistenceEsDAO<Global
return globalTrace;
}
- @Override protected Map<String, Object> esStreamDataToEsData(GlobalTrace streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(GlobalTraceTable.SEGMENT_ID.getName(), streamData.getSegmentId());
- target.put(GlobalTraceTable.TRACE_ID.getName(), streamData.getTraceId());
- target.put(GlobalTraceTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ @Override protected XContentBuilder esStreamDataToEsData(GlobalTrace streamData) throws IOException {
+ return XContentFactory.jsonBuilder()
+ .startObject()
+ .field(GlobalTraceTable.SEGMENT_ID.getName(), streamData.getSegmentId())
+ .field(GlobalTraceTable.TRACE_ID.getName(), streamData.getTraceId())
+ .field(GlobalTraceTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
index f7ee828..83f3452 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
@@ -18,7 +18,8 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -28,6 +29,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
/**
@@ -63,9 +65,11 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
throw new UnexpectedException("Received an instance heart beat message under instance id= " + data.getId() + " , which doesn't exist.");
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(Instance data) {
- Map<String, Object> source = new HashMap<>();
- source.put(InstanceTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime());
+ @Override public UpdateRequestBuilder prepareBatchUpdate(Instance data) throws IOException {
+ XContentBuilder source = XContentFactory.jsonBuilder().startObject()
+ .field(InstanceTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime())
+ .endObject();
+
return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 24a739d..9814964 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
-import com.google.gson.Gson;
-import java.util.*;
+import java.io.IOException;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.*;
@@ -35,9 +35,7 @@ import org.slf4j.*;
*/
public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDurationPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, SegmentDuration> {
- private final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
-
- private final Gson gson = new Gson();
+ private static final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
public SegmentDurationEsPersistenceDAO(ElasticSearchClient client) {
super(client);
@@ -54,16 +52,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
}
@Override
- public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
- Map<String, Object> target = new HashMap<>();
- target.put(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId());
- target.put(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId());
- target.put(SegmentDurationTable.SERVICE_NAME.getName(), gson.toJson(data.getServiceName()));
- target.put(SegmentDurationTable.DURATION.getName(), data.getDuration());
- target.put(SegmentDurationTable.START_TIME.getName(), data.getStartTime());
- target.put(SegmentDurationTable.END_TIME.getName(), data.getEndTime());
- target.put(SegmentDurationTable.IS_ERROR.getName(), data.getIsError());
- target.put(SegmentDurationTable.TIME_BUCKET.getName(), data.getTimeBucket());
+ public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId())
+ .field(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId())
+ .array(SegmentDurationTable.SERVICE_NAME.getName(), data.getServiceName())
+ .field(SegmentDurationTable.DURATION.getName(), data.getDuration())
+ .field(SegmentDurationTable.START_TIME.getName(), data.getStartTime())
+ .field(SegmentDurationTable.END_TIME.getName(), data.getEndTime())
+ .field(SegmentDurationTable.IS_ERROR.getName(), data.getIsError())
+ .field(SegmentDurationTable.TIME_BUCKET.getName(), data.getTimeBucket())
+ .endObject();
+
return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(target);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
index 00dcdc3..6662607 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
+import java.io.IOException;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -26,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersisten
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -47,11 +49,11 @@ public class SegmentEsPersistenceDAO extends AbstractPersistenceEsDAO<Segment> i
return segment;
}
- @Override protected Map<String, Object> esStreamDataToEsData(Segment streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(streamData.getDataBinary())));
- target.put(SegmentTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ @Override protected XContentBuilder esStreamDataToEsData(Segment streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(streamData.getDataBinary())))
+ .field(SegmentTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
index 1c9b826..c3d0525 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
@@ -18,7 +18,8 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -28,6 +29,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
/**
@@ -43,18 +45,24 @@ public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServ
@GraphComputingMetric(name = "/persistence/get/" + ServiceNameTable.TABLE + "/heartbeat")
@Override public ServiceName get(String id) {
- GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).get();
+ String[] includeSources = {ServiceNameTable.HEARTBEAT_TIME.getName()};
+ GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).setFetchSource(includeSources, null).get();
if (getResponse.isExists()) {
Map<String, Object> source = getResponse.getSource();
ServiceName serviceName = new ServiceName();
serviceName.setId(id);
- serviceName.setServiceId(((Number)source.get(ServiceNameTable.SERVICE_ID.getName())).intValue());
+ serviceName.setServiceId(Integer.valueOf(id));
serviceName.setHeartBeatTime(((Number)source.get(ServiceNameTable.HEARTBEAT_TIME.getName())).longValue());
- logger.debug("service id: {} is exists", id);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("service id: {} is exists", id);
+ }
return serviceName;
} else {
- logger.debug("service id: {} is not exists", id);
+ if (logger.isDebugEnabled()) {
+ logger.debug("service id: {} is not exists", id);
+ }
return null;
}
}
@@ -63,11 +71,15 @@ public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServ
throw new UnexpectedException("Received an service name heart beat message under service id= " + data.getId() + " , which doesn't exist.");
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) {
- logger.info("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime());
+ @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime());
+ }
+
+ XContentBuilder source = XContentFactory.jsonBuilder().startObject()
+ .field(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime())
+ .endObject();
- Map<String, Object> source = new HashMap<>();
- source.put(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime());
return getClient().prepareUpdate(ServiceNameTable.TABLE, data.getId()).setDoc(source);
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
index 42377e8..45fa5f9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.acp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponentTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -49,15 +49,14 @@ public abstract class AbstractApplicationComponentEsPersistenceDAO extends Abstr
return applicationComponent;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationComponent streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationComponentTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(ApplicationComponent streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationComponentTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ApplicationComponentTable.COMPONENT_ID.getName(), streamData.getComponentId());
- target.put(ApplicationComponentTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ApplicationComponentTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(ApplicationComponentTable.COMPONENT_ID.getName(), streamData.getComponentId())
+ .field(ApplicationComponentTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ApplicationComponentTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + ApplicationComponentTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
index 047a786..958f7d7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -52,17 +52,17 @@ public abstract class AbstractApplicationAlarmListEsPersistenceDAO extends Abstr
return applicationAlarmList;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationAlarmListTable.METRIC_ID.getName(), streamData.getMetricId());
- target.put(ApplicationAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ApplicationAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected final XContentBuilder esStreamDataToEsData(ApplicationAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationAlarmListTable.METRIC_ID.getName(), streamData.getMetricId())
+ .field(ApplicationAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ApplicationAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ApplicationAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ApplicationAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ApplicationAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ApplicationAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ApplicationAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(ApplicationAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + ApplicationAlarmListTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index 6b7e3d1..15a4b32 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -54,16 +54,16 @@ public class ApplicationAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<A
return instanceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ApplicationAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ApplicationAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ApplicationAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ApplicationAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ApplicationAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ApplicationAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ApplicationAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ApplicationAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ApplicationAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(ApplicationAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
index fd5069e..a723f71 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -55,17 +55,17 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends AbstractPersisten
return applicationReferenceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ApplicationReferenceAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(ApplicationReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ApplicationReferenceAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(ApplicationReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ApplicationReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ApplicationReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ApplicationReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ApplicationReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
index e988a1b..8945091 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -55,17 +55,18 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends AbstractPersi
return applicationReferenceAlarmList;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ApplicationReferenceAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(ApplicationReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override
+ protected XContentBuilder esStreamDataToEsData(ApplicationReferenceAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(ApplicationReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ApplicationReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ApplicationReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ApplicationReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ApplicationReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ApplicationReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(ApplicationReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
index 3d53fc5..c19f528 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -55,17 +55,17 @@ public class InstanceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<Inst
return instanceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(InstanceAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(InstanceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(InstanceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(InstanceAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(InstanceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(InstanceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(InstanceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(InstanceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(InstanceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(InstanceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(InstanceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(InstanceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(InstanceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
index 96fef53..5f5ffa7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -55,17 +55,17 @@ public class InstanceAlarmListEsPersistenceDAO extends AbstractPersistenceEsDAO<
return instanceAlarmList;
}
- @Override protected Map<String, Object> esStreamDataToEsData(InstanceAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(InstanceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(InstanceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(InstanceAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(InstanceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(InstanceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(InstanceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(InstanceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(InstanceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(InstanceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(InstanceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(InstanceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(InstanceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
index fdda6ea..7c76791 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -57,19 +57,19 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceE
return instanceReferenceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(InstanceReferenceAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
- target.put(InstanceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(InstanceReferenceAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+ .field(InstanceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(InstanceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(InstanceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(InstanceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(InstanceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(InstanceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(InstanceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
index d695cc5..f90943a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -57,19 +57,19 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends AbstractPersiste
return serviceReferenceAlarmList;
}
- @Override protected Map<String, Object> esStreamDataToEsData(InstanceReferenceAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
- target.put(InstanceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(InstanceReferenceAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+ .field(InstanceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(InstanceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(InstanceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(InstanceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(InstanceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(InstanceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(InstanceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
index a16d2fa..8a330ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -56,18 +56,18 @@ public class ServiceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<Servi
return serviceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ServiceAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ServiceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(ServiceAlarmTable.SERVICE_ID.getName(), streamData.getServiceId());
- target.put(ServiceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ServiceAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ServiceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ServiceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(ServiceAlarmTable.SERVICE_ID.getName(), streamData.getServiceId())
+ .field(ServiceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ServiceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ServiceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ServiceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ServiceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ServiceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(ServiceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
index 39e4b2e..33078a9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -56,18 +56,18 @@ public class ServiceAlarmListEsPersistenceDAO extends AbstractPersistenceEsDAO<S
return serviceAlarmList;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ServiceAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ServiceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(ServiceAlarmListTable.SERVICE_ID.getName(), streamData.getServiceId());
- target.put(ServiceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ServiceAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ServiceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ServiceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(ServiceAlarmListTable.SERVICE_ID.getName(), streamData.getServiceId())
+ .field(ServiceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ServiceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ServiceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ServiceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ServiceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ServiceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(ServiceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
index 407258a..25f819d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -59,21 +59,21 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceEs
return serviceReferenceAlarm;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ServiceReferenceAlarm streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
- target.put(ServiceReferenceAlarmTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
- target.put(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
- target.put(ServiceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ServiceReferenceAlarm streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+ .field(ServiceReferenceAlarmTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+ .field(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId())
+ .field(ServiceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ServiceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ServiceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ServiceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ServiceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ServiceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
- return target;
+ .field(ServiceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
index ade36d3..d1de511 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -59,21 +59,21 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends AbstractPersisten
return serviceReferenceAlarmList;
}
- @Override protected Map<String, Object> esStreamDataToEsData(ServiceReferenceAlarmList streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
- target.put(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
- target.put(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
- target.put(ServiceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+ @Override protected XContentBuilder esStreamDataToEsData(ServiceReferenceAlarmList streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+ .field(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+ .field(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId())
+ .field(ServiceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
- target.put(ServiceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
- target.put(ServiceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+ .field(ServiceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+ .field(ServiceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
- target.put(ServiceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- return target;
+ .field(ServiceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
index b4f8d8b..2dbfadd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.amp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
+import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -55,18 +55,19 @@ public abstract class AbstractApplicationMetricEsPersistenceDAO extends Abstract
return applicationMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(ApplicationMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ApplicationMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
+ .field(ApplicationMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
- MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ .field(ApplicationMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount())
+ .field(ApplicationMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount())
+ .field(ApplicationMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
- target.put(ApplicationMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount());
- target.put(ApplicationMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount());
- target.put(ApplicationMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
+ MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
index b02bd15..182be94 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ampp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMappingTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -49,15 +49,14 @@ public abstract class AbstractApplicationMappingEsPersistenceDAO extends Abstrac
return applicationMapping;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationMapping streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationMappingTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(ApplicationMapping streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationMappingTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ApplicationMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ApplicationMappingTable.MAPPING_APPLICATION_ID.getName(), streamData.getMappingApplicationId());
- target.put(ApplicationMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(ApplicationMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ApplicationMappingTable.MAPPING_APPLICATION_ID.getName(), streamData.getMappingApplicationId())
+ .field(ApplicationMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + ApplicationMappingTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
index 0ef7935..6246199 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.armp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -56,19 +56,21 @@ public abstract class AbstractApplicationReferenceMetricEsPersistenceDAO extends
return applicationReferenceMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationReferenceMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ApplicationReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override
+ protected final XContentBuilder esStreamDataToEsData(ApplicationReferenceMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(ApplicationReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
+ .field(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
- MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ .field(ApplicationReferenceMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount())
+ .field(ApplicationReferenceMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount())
+ .field(ApplicationReferenceMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
- target.put(ApplicationReferenceMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount());
- target.put(ApplicationReferenceMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount());
- target.put(ApplicationReferenceMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
+ MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
index fcae67f..6ffafd6 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.cpu;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.CpuMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.CpuMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -52,16 +52,15 @@ public abstract class AbstractCpuMetricEsPersistenceDAO extends AbstractPersiste
return cpuMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(CpuMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(CpuMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(CpuMetric streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(CpuMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(CpuMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(CpuMetricTable.USAGE_PERCENT.getName(), streamData.getUsagePercent());
- target.put(CpuMetricTable.TIMES.getName(), streamData.getTimes());
- target.put(CpuMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(CpuMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(CpuMetricTable.USAGE_PERCENT.getName(), streamData.getUsagePercent())
+ .field(CpuMetricTable.TIMES.getName(), streamData.getTimes())
+ .field(CpuMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + CpuMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
index c215b85..bd1fc53 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
@@ -18,11 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.gc;
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -53,18 +55,17 @@ public abstract class AbstractGCMetricEsPersistenceDAO extends AbstractPersisten
return gcMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(GCMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(GCMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(GCMetric streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(GCMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(GCMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(GCMetricTable.PHRASE.getName(), streamData.getPhrase());
- target.put(GCMetricTable.COUNT.getName(), streamData.getCount());
- target.put(GCMetricTable.TIMES.getName(), streamData.getTimes());
- target.put(GCMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
- target.put(GCMetricTable.DURATION.getName(), streamData.getDuration());
-
- return target;
+ .field(GCMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(GCMetricTable.PHRASE.getName(), streamData.getPhrase())
+ .field(GCMetricTable.COUNT.getName(), streamData.getCount())
+ .field(GCMetricTable.TIMES.getName(), streamData.getTimes())
+ .field(GCMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .field(GCMetricTable.DURATION.getName(), streamData.getDuration())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + GCMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
index a9d48f2..0abc207 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.imp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -52,14 +52,15 @@ public abstract class AbstractInstanceMetricEsPersistenceDAO extends AbstractPer
return instanceMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(InstanceMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
- target.put(InstanceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(InstanceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
+ @Override protected final XContentBuilder esStreamDataToEsData(InstanceMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(InstanceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+ .field(InstanceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(InstanceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
index 093894b..6543832 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.impp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -50,16 +50,15 @@ public abstract class AbstractInstanceMappingEsPersistenceDAO extends AbstractPe
return instanceMapping;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(InstanceMapping streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceMappingTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(InstanceMapping streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(InstanceMappingTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(InstanceMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(InstanceMappingTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(InstanceMappingTable.ADDRESS_ID.getName(), streamData.getAddressId());
- target.put(InstanceMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(InstanceMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(InstanceMappingTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(InstanceMappingTable.ADDRESS_ID.getName(), streamData.getAddressId())
+ .field(InstanceMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + InstanceMappingTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
index fa474b7..9ae8c87 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.irmp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -54,17 +54,18 @@ public abstract class AbstractInstanceReferenceMetricEsPersistenceDAO extends Ab
return instanceReferenceMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(InstanceReferenceMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(InstanceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
- target.put(InstanceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(InstanceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
+ @Override
+ protected final XContentBuilder esStreamDataToEsData(InstanceReferenceMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(InstanceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+ .field(InstanceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(InstanceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
index 8f12f0d..df85e52 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.memory;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -56,20 +56,19 @@ public abstract class AbstractMemoryMetricEsPersistenceDAO extends AbstractPersi
return memoryMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(MemoryMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(MemoryMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(MemoryMetric streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(MemoryMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(MemoryMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(MemoryMetricTable.IS_HEAP.getName(), streamData.getIsHeap());
- target.put(MemoryMetricTable.INIT.getName(), streamData.getInit());
- target.put(MemoryMetricTable.MAX.getName(), streamData.getMax());
- target.put(MemoryMetricTable.USED.getName(), streamData.getUsed());
- target.put(MemoryMetricTable.COMMITTED.getName(), streamData.getCommitted());
- target.put(MemoryMetricTable.TIMES.getName(), streamData.getTimes());
- target.put(MemoryMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(MemoryMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(MemoryMetricTable.IS_HEAP.getName(), streamData.getIsHeap())
+ .field(MemoryMetricTable.INIT.getName(), streamData.getInit())
+ .field(MemoryMetricTable.MAX.getName(), streamData.getMax())
+ .field(MemoryMetricTable.USED.getName(), streamData.getUsed())
+ .field(MemoryMetricTable.COMMITTED.getName(), streamData.getCommitted())
+ .field(MemoryMetricTable.TIMES.getName(), streamData.getTimes())
+ .field(MemoryMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + MemoryMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 12f87cd..20dd4e9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -59,20 +59,18 @@ public abstract class AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
}
@Override
- protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(MemoryPoolMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
- target.put(MemoryPoolMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(MemoryPoolMetricTable.POOL_TYPE.getName(), streamData.getPoolType());
- target.put(MemoryPoolMetricTable.INIT.getName(), streamData.getInit());
- target.put(MemoryPoolMetricTable.MAX.getName(), streamData.getMax());
- target.put(MemoryPoolMetricTable.USED.getName(), streamData.getUsed());
- target.put(MemoryPoolMetricTable.COMMITTED.getName(), streamData.getCommitted());
- target.put(MemoryPoolMetricTable.TIMES.getName(), streamData.getTimes());
- target.put(MemoryPoolMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ protected final XContentBuilder esStreamDataToEsData(MemoryPoolMetric streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(MemoryPoolMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+ .field(MemoryPoolMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(MemoryPoolMetricTable.POOL_TYPE.getName(), streamData.getPoolType())
+ .field(MemoryPoolMetricTable.INIT.getName(), streamData.getInit())
+ .field(MemoryPoolMetricTable.MAX.getName(), streamData.getMax())
+ .field(MemoryPoolMetricTable.USED.getName(), streamData.getUsed())
+ .field(MemoryPoolMetricTable.COMMITTED.getName(), streamData.getCommitted())
+ .field(MemoryPoolMetricTable.TIMES.getName(), streamData.getTimes())
+ .field(MemoryPoolMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + MemoryPoolMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
index d6fb4a5..a37a76d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
@@ -18,13 +18,13 @@
package org.apache.skywalking.apm.collector.storage.es.dao.rtd;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistribution;
-import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistributionTable;
+import org.apache.skywalking.apm.collector.storage.table.global.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -53,19 +53,19 @@ public abstract class AbstractResponseTimeDistributionEsPersistenceDAO extends A
return responseTimeDistribution;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ResponseTimeDistribution streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ResponseTimeDistributionTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override
+ protected final XContentBuilder esStreamDataToEsData(ResponseTimeDistribution streamData) throws IOException {
+ return XContentFactory.jsonBuilder().startObject()
+ .field(ResponseTimeDistributionTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ResponseTimeDistributionTable.STEP.getName(), streamData.getStep());
+ .field(ResponseTimeDistributionTable.STEP.getName(), streamData.getStep())
- target.put(ResponseTimeDistributionTable.CALLS.getName(), streamData.getCalls());
- target.put(ResponseTimeDistributionTable.ERROR_CALLS.getName(), streamData.getErrorCalls());
- target.put(ResponseTimeDistributionTable.SUCCESS_CALLS.getName(), streamData.getSuccessCalls());
+ .field(ResponseTimeDistributionTable.CALLS.getName(), streamData.getCalls())
+ .field(ResponseTimeDistributionTable.ERROR_CALLS.getName(), streamData.getErrorCalls())
+ .field(ResponseTimeDistributionTable.SUCCESS_CALLS.getName(), streamData.getSuccessCalls())
- target.put(ResponseTimeDistributionTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
- return target;
+ .field(ResponseTimeDistributionTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+ .endObject();
}
@GraphComputingMetric(name = "/persistence/get/" + ResponseTimeDistributionTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
index 4530ccc..b07f6a5 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.smp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.service.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -53,16 +53,17 @@ public abstract class AbstractServiceMetricEsPersistenceDAO extends AbstractPers
return serviceMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ServiceMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+ @Override protected final XContentBuilder esStreamDataToEsData(ServiceMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(ServiceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
- target.put(ServiceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
- target.put(ServiceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
- target.put(ServiceMetricTable.SERVICE_ID.getName(), streamData.getServiceId());
+ .field(ServiceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+ .field(ServiceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+ .field(ServiceMetricTable.SERVICE_ID.getName(), streamData.getServiceId());
MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
index 3a1a8df..1d9078a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.service.*;
+import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
@@ -56,19 +56,20 @@ public abstract class AbstractServiceReferenceMetricEsPersistenceDAO extends Abs
return serviceReferenceMetric;
}
- @Override protected final Map<String, Object> esStreamDataToEsData(ServiceReferenceMetric streamData) {
- Map<String, Object> target = new HashMap<>();
- target.put(ServiceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
- target.put(ServiceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
- target.put(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
- target.put(ServiceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
- target.put(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
- target.put(ServiceReferenceMetricTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
- target.put(ServiceReferenceMetricTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
+ @Override
+ protected final XContentBuilder esStreamDataToEsData(ServiceReferenceMetric streamData) throws IOException {
+ XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+ .field(ServiceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+ .field(ServiceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+ .field(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+ .field(ServiceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+ .field(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+ .field(ServiceReferenceMetricTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+ .field(ServiceReferenceMetricTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+ target.endObject();
return target;
}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
index 93ee042..a2ce032 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
@@ -52,7 +52,7 @@ public class ApplicationComponentEsUIDAO extends EsDAO implements IApplicationCo
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
searchRequestBuilder.setTypes(ApplicationComponentTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
- searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(ApplicationComponentTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
+ searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery(ApplicationComponentTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(AggregationBuilders.terms(ApplicationComponentTable.COMPONENT_ID.getName()).field(ApplicationComponentTable.COMPONENT_ID.getName()).size(100)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
index 4164474..676263f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
@@ -52,7 +52,7 @@ public class ApplicationMappingEsUIDAO extends EsDAO implements IApplicationMapp
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
searchRequestBuilder.setTypes(ApplicationMappingTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
- searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(ApplicationMappingTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
+ searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery(ApplicationMappingTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
index 0fc5d87..4da0071 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
@@ -104,7 +104,7 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri
boolQuery.must().add(QueryBuilders.rangeQuery(ApplicationMetricTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
boolQuery.must().add(QueryBuilders.termQuery(ApplicationMetricTable.SOURCE_VALUE.getName(), metricSource.getValue()));
- searchRequestBuilder.setQuery(boolQuery);
+ searchRequestBuilder.setPostFilter(boolQuery);
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.APPLICATION_ID.getName()).field(ApplicationMetricTable.APPLICATION_ID.getName()).size(100);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
index 3dd84de..169f067 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
@@ -71,7 +71,7 @@ public class ApplicationReferenceMetricEsUIDAO extends EsDAO implements IApplica
boolQuery.must().add(applicationBoolQuery);
}
- searchRequestBuilder.setQuery(boolQuery);
+ searchRequestBuilder.setPostFilter(boolQuery);
searchRequestBuilder.setSize(0);
return buildMetrics(searchRequestBuilder);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
index 30f20ef..00e0ff9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
@@ -176,7 +176,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
timeBoolQuery.should().add(boolQuery2);
boolQuery.must().add(timeBoolQuery);
- searchRequestBuilder.setQuery(boolQuery);
+ searchRequestBuilder.setPostFilter(boolQuery);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] searchHits = searchResponse.getHits().getHits();
@@ -190,7 +190,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(1);
- searchRequestBuilder.setQuery(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
+ searchRequestBuilder.setPostFilter(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
searchRequestBuilder.addSort(SortBuilders.fieldSort(InstanceTable.REGISTER_TIME.getName()).order(SortOrder.ASC));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
@@ -210,7 +210,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(1);
- searchRequestBuilder.setQuery(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
+ searchRequestBuilder.setPostFilter(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
searchRequestBuilder.addSort(SortBuilders.fieldSort(InstanceTable.HEARTBEAT_TIME.getName()).order(SortOrder.DESC));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
index 3a11e32..0de2a2c 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class BatchH2DAO extends H2DAO implements IBatchDAO {
- private final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
+ private static final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
public BatchH2DAO(H2Client client) {
super(client);
@@ -44,7 +44,10 @@ public class BatchH2DAO extends H2DAO implements IBatchDAO {
@Override
public void batchPersistence(List<?> batchCollection) {
if (batchCollection != null && batchCollection.size() > 0) {
- logger.debug("the batch collection size is {}", batchCollection.size());
+ if (logger.isDebugEnabled()) {
+ logger.debug("the batch collection size is {}", batchCollection.size());
+ }
+
Connection conn;
final Map<String, PreparedStatement> batchSqls = new HashMap<>();
try {
@@ -63,7 +66,10 @@ public class BatchH2DAO extends H2DAO implements IBatchDAO {
Object[] params = e.getParams();
if (params != null) {
- logger.debug("the sql is {}, params size is {}, params: {}", e.getSql(), params.length, params);
+ if (logger.isDebugEnabled()) {
+ logger.debug("the sql is {}, params size is {}, params: {}", e.getSql(), params.length, params);
+ }
+
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
index 56f36c4..2b4ea52 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
@@ -19,43 +19,23 @@
package org.apache.skywalking.apm.collector.ui.jetty.handler;
import com.coxautodev.graphql.tools.SchemaParser;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
import com.google.gson.reflect.TypeToken;
-import graphql.ExecutionInput;
-import graphql.ExecutionResult;
-import graphql.GraphQL;
-import graphql.GraphQLError;
+import graphql.*;
import graphql.schema.GraphQLSchema;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
+import java.io.*;
+import java.util.*;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.apache.skywalking.apm.collector.storage.ui.application.ApplicationNode;
-import org.apache.skywalking.apm.collector.storage.ui.application.ConjecturalNode;
+import org.apache.skywalking.apm.collector.storage.ui.application.*;
import org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode;
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceNode;
-import org.apache.skywalking.apm.collector.ui.graphql.VersionMutation;
-import org.apache.skywalking.apm.collector.ui.graphql.VersionQuery;
+import org.apache.skywalking.apm.collector.ui.graphql.*;
import org.apache.skywalking.apm.collector.ui.mutation.ConfigMutation;
-import org.apache.skywalking.apm.collector.ui.query.AlarmQuery;
-import org.apache.skywalking.apm.collector.ui.query.ApplicationQuery;
-import org.apache.skywalking.apm.collector.ui.query.ConfigQuery;
-import org.apache.skywalking.apm.collector.ui.query.OverViewLayerQuery;
-import org.apache.skywalking.apm.collector.ui.query.ServerQuery;
-import org.apache.skywalking.apm.collector.ui.query.ServiceQuery;
-import org.apache.skywalking.apm.collector.ui.query.TraceQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.apm.collector.ui.query.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -96,30 +76,29 @@ public class GraphQLHandler extends JettyJsonHandler {
return "/graphql";
}
- @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+ @Override protected JsonElement doGet(HttpServletRequest req) {
return execute(req.getParameter(QUERY), null);
}
- @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException {
+ @Override protected JsonElement doPost(HttpServletRequest req) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
String line;
- String request = "";
+ StringBuilder request = new StringBuilder();
while ((line = reader.readLine()) != null) {
- request += line;
+ request.append(line);
}
- JsonObject requestJson = gson.fromJson(request, JsonObject.class);
+ JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
- Type mapType = new TypeToken<Map<String, Object>>() { }.getType();
-
- return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapType));
+ return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), new TypeToken<Map<String, Object>>() {
+ }.getType()));
}
private JsonObject execute(String request, Map<String, Object> variables) {
try {
ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(request).variables(variables).build();
ExecutionResult executionResult = graphQL.execute(executionInput);
- logger.info("Execution result is {}", executionResult);
+ logger.debug("Execution result is {}", executionResult);
Object data = executionResult.getData();
List<GraphQLError> errors = executionResult.getErrors();
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
index 61159c3..535f49c 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
@@ -75,7 +75,7 @@ public class ApplicationTopologyService {
TopologyBuilder builder = new TopologyBuilder(moduleManager);
- Topology topology = builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, step, startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+ Topology topology = builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
Set<Integer> nodeIds = new HashSet<>();
topology.getCalls().forEach(call -> {
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
index 6616df6..d0d1f04 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
@@ -67,6 +67,6 @@ public class ClusterTopologyService {
TopologyBuilder builder = new TopologyBuilder(moduleManager);
- return builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, step, startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+ return builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
}
}
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
index 6ef3f36..c23f579 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
@@ -28,7 +28,6 @@ import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.apache.skywalking.apm.collector.storage.ui.alarm.Alarm;
import org.apache.skywalking.apm.collector.storage.ui.application.*;
import org.apache.skywalking.apm.collector.storage.ui.common.*;
import org.apache.skywalking.apm.collector.ui.utils.*;
@@ -44,14 +43,12 @@ class TopologyBuilder {
private final ApplicationCacheService applicationCacheService;
private final ServerService serverService;
private final DateBetweenService dateBetweenService;
- private final AlarmService alarmService;
private final IComponentLibraryCatalogService componentLibraryCatalogService;
TopologyBuilder(ModuleManager moduleManager) {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
this.serverService = new ServerService(moduleManager);
this.dateBetweenService = new DateBetweenService(moduleManager);
- this.alarmService = new AlarmService(moduleManager);
this.componentLibraryCatalogService = moduleManager.find(ConfigurationModule.NAME).getService(IComponentLibraryCatalogService.class);
}
@@ -60,17 +57,17 @@ class TopologyBuilder {
List<IApplicationMetricUIDAO.ApplicationMetric> applicationMetrics,
List<IApplicationReferenceMetricUIDAO.ApplicationReferenceMetric> callerReferenceMetric,
List<IApplicationReferenceMetricUIDAO.ApplicationReferenceMetric> calleeReferenceMetric,
- Step step, long startTimeBucket, long endTimeBucket, long startSecondTimeBucket, long endSecondTimeBucket) {
+ long startSecondTimeBucket, long endSecondTimeBucket) {
Map<Integer, String> nodeCompMap = buildNodeCompMap(applicationComponents);
Map<Integer, String> conjecturalNodeCompMap = buildConjecturalNodeCompMap(applicationComponents);
Map<Integer, Integer> mappings = changeMapping2Map(applicationMappings);
-
filterZeroSourceOrTargetReference(callerReferenceMetric);
filterZeroSourceOrTargetReference(calleeReferenceMetric);
-
calleeReferenceMetric = calleeReferenceMetricFilter(calleeReferenceMetric);
List<Node> nodes = new LinkedList<>();
+ Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
+ Map<Integer, Integer> numberOfServer = new HashMap<>();
applicationMetrics.forEach(applicationMetric -> {
int applicationId = applicationMetric.getId();
Application application = applicationCacheService.getApplicationById(applicationId);
@@ -81,23 +78,14 @@ class TopologyBuilder {
applicationNode.setSla(SLACalculator.INSTANCE.calculate(applicationMetric.getErrorCalls(), applicationMetric.getCalls()));
try {
- applicationNode.setCpm(applicationMetric.getCalls() / dateBetweenService.minutesBetween(applicationId, startSecondTimeBucket, endSecondTimeBucket));
+ applicationNode.setCpm(applicationMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, applicationId, startSecondTimeBucket, endSecondTimeBucket));
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
applicationNode.setAvgResponseTime(applicationMetric.getDurations() / applicationMetric.getCalls());
applicationNode.setApdex(ApdexCalculator.INSTANCE.calculate(applicationMetric.getSatisfiedCount(), applicationMetric.getToleratingCount(), applicationMetric.getFrustratedCount()));
applicationNode.setAlarm(false);
- try {
- Alarm alarm = alarmService.loadApplicationAlarmList(Const.EMPTY_STRING, applicationId, step, startTimeBucket, endTimeBucket, 1, 0);
- if (alarm.getItems().size() > 0) {
- applicationNode.setAlarm(true);
- }
- } catch (ParseException e) {
- logger.error(e.getMessage(), e);
- }
-
- applicationNode.setNumOfServer(serverService.getAllServer(applicationId, startSecondTimeBucket, endSecondTimeBucket).size());
+ applicationNode.setNumOfServer(getNumberOfServer(numberOfServer, applicationId, startSecondTimeBucket, endSecondTimeBucket));
nodes.add(applicationNode);
});
@@ -139,7 +127,7 @@ class TopologyBuilder {
call.setAlert(false);
call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
try {
- call.setCpm(referenceMetric.getCalls() / dateBetweenService.minutesBetween(source.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
+ call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
@@ -186,7 +174,7 @@ class TopologyBuilder {
call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
}
try {
- call.setCpm(referenceMetric.getCalls() / dateBetweenService.minutesBetween(target.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
+ call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
@@ -257,4 +245,28 @@ class TopologyBuilder {
}
}
}
+
+ private int getApplicationMinuteBetween(Map<Integer, Integer> applicationMinuteBetweenMap, int applicationId,
+ long startSecondTimeBucket,
+ long endSecondTimeBucket) throws ParseException {
+ if (applicationMinuteBetweenMap.containsKey(applicationId)) {
+ return applicationMinuteBetweenMap.get(applicationId);
+ } else {
+ int applicationMinuteBetween = dateBetweenService.minutesBetween(applicationId, startSecondTimeBucket, endSecondTimeBucket);
+ applicationMinuteBetweenMap.put(applicationId, applicationMinuteBetween);
+ return applicationMinuteBetween;
+ }
+ }
+
+ private int getNumberOfServer(Map<Integer, Integer> numberOfServerMap, int applicationId,
+ long startSecondTimeBucket,
+ long endSecondTimeBucket) {
+ if (numberOfServerMap.containsKey(applicationId)) {
+ return numberOfServerMap.get(applicationId);
+ } else {
+ int numberOfServer = serverService.getAllServer(applicationId, startSecondTimeBucket, endSecondTimeBucket).size();
+ numberOfServerMap.put(applicationId, numberOfServer);
+ return numberOfServer;
+ }
+ }
}
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
index 35d23e3..d04376e 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
@@ -59,7 +59,6 @@ public class TopologyBuilderTest {
alarmService = mock(AlarmService.class);
dateBetweenService = mock(DateBetweenService.class);
Whitebox.setInternalState(topologyBuilder, "applicationCacheService", applicationCacheService);
- Whitebox.setInternalState(topologyBuilder, "alarmService", alarmService);
Whitebox.setInternalState(topologyBuilder, "dateBetweenService", dateBetweenService);
duration = new Duration();
duration.setEnd("2018-02");
@@ -129,7 +128,7 @@ public class TopologyBuilderTest {
return alarm;
});
when(dateBetweenService.minutesBetween(anyInt(), anyLong(), anyLong())).then(invocation -> 20L);
- Topology topology = topologyBuilder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, duration.getStep(), startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+ Topology topology = topologyBuilder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
Assert.assertNotNull(topology);
}