You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/02/13 13:11:19 UTC
[skywalking] branch master updated: Help people to read source
codes of core module. (#4357)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 6be3e99 Help people to read source codes of core module. (#4357)
6be3e99 is described below
commit 6be3e9909aac851faa9816a869047854d8c8ca2e
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Feb 13 21:11:05 2020 +0800
Help people to read source codes of core module. (#4357)
* Add more comments in the core module, should help the source code readers a lot.
---
.../oap/server/starter/OAPServerBootstrap.java | 14 ++---
.../skywalking/oap/server/core/CoreModule.java | 3 ++
.../oap/server/core/CoreModuleProvider.java | 55 ++++++++++++++------
.../oap/server/core/analysis/Stream.java | 26 +++++++++-
.../core/analysis/StreamAnnotationListener.java | 3 ++
.../server/core/analysis/worker/ExportWorker.java | 4 ++
.../analysis/worker/MetricsAggregateWorker.java | 16 ++++--
.../analysis/worker/MetricsPersistentWorker.java | 28 +++++++---
.../core/analysis/worker/MetricsRemoteWorker.java | 12 ++---
.../analysis/worker/MetricsStreamProcessor.java | 60 ++++++++++++++++++----
.../core/analysis/worker/MetricsTransWorker.java | 38 ++++++++++----
.../oap/server/core/command/CommandService.java | 10 ++--
.../server/core/query/AggregationQueryService.java | 12 ++---
.../oap/server/core/query/MetricQueryService.java | 10 ++--
.../register/worker/InventoryStreamProcessor.java | 30 +++++++++--
.../oap/server/core/remote/Deserializable.java | 3 ++
.../server/core/remote/RemoteSenderService.java | 14 ++++-
.../server/core/remote/RemoteServiceHandler.java | 32 ++++++++++--
.../oap/server/core/remote/Serializable.java | 3 ++
.../skywalking/oap/server/core/storage/DAO.java | 3 ++
.../oap/server/core/storage/IBatchDAO.java | 19 ++++++-
.../oap/server/core/storage/IHistoryDeleteDAO.java | 12 ++++-
.../oap/server/core/storage/IMetricsDAO.java | 24 ++++++++-
.../oap/server/core/storage/IRecordDAO.java | 10 +++-
.../oap/server/core/storage/IRegisterDAO.java | 15 +++++-
.../oap/server/core/storage/StorageBuilder.java | 5 ++
.../oap/server/core/storage/StorageDAO.java | 3 ++
.../oap/server/core/storage/StorageModule.java | 4 ++
.../oap/server/core/storage/annotation/Column.java | 8 +++
...alueColumnIds.java => ValueColumnMetadata.java} | 19 +++++--
.../server/core/storage/model/ModelInstaller.java | 36 +++++++++----
.../server/core/storage/model/StorageModels.java | 4 +-
.../core/storage/ttl/DataTTLKeeperTimer.java | 20 +++++++-
.../oap/server/core/worker/AbstractWorker.java | 9 ++++
.../server/core/worker/IWorkerInstanceGetter.java | 4 +-
.../server/core/worker/IWorkerInstanceSetter.java | 9 ++++
.../handler/v6/grpc/RegisterServiceHandler.java | 18 +++++--
.../v6/grpc/ServiceInstancePingServiceHandler.java | 23 +++++----
.../oap/server/starter/OAPServerStartUp.java | 5 +-
.../oap/server/starter/OAPServerStartUp.java | 4 +-
.../plugin/jdbc/h2/dao/H2TableInstaller.java | 24 +++++----
.../plugin/jdbc/mysql/MySQLTableInstaller.java | 22 +++++---
42 files changed, 530 insertions(+), 143 deletions(-)
diff --git a/oap-server/server-bootstrap/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerBootstrap.java b/oap-server/server-bootstrap/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerBootstrap.java
index 3614427..3e893e4 100644
--- a/oap-server/server-bootstrap/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerBootstrap.java
+++ b/oap-server/server-bootstrap/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerBootstrap.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.starter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -25,13 +26,12 @@ import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * Starter core. Load the core configuration file, and initialize the startup sequence through {@link ModuleManager}.
+ */
+@Slf4j
public class OAPServerBootstrap {
-
- private static final Logger logger = LoggerFactory.getLogger(OAPServerBootstrap.class);
-
public static void start() {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
@@ -50,11 +50,11 @@ public class OAPServerBootstrap {
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
- logger.info("OAP starts up in init mode successfully, exit now...");
+ log.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
System.exit(1);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index ce71372..e7fa4af 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -55,6 +55,9 @@ import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+/**
+ * Core module definition. Define all open services to other modules.
+ */
public class CoreModule extends ModuleDefine {
public static final String NAME = "core";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9069cb0..8f8d2b9 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -93,6 +93,14 @@ import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+/**
+ * Core module provider includes the recommended and default implementations of {@link CoreModule#services()}. All
+ * services with these default implementations are widely used including data receiver, data analysis, streaming
+ * process, storage and query.
+ *
+ * NOTICE. In our experiences, no one should re-implement the core module service implementations, unless we are very
+ * familiar with all mechanisms of SkyWalking.
+ */
public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
@@ -169,12 +177,14 @@ public class CoreModuleProvider extends ModuleProvider {
}
grpcServer.initialize();
- jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig
+ jettyServer = new JettyServer(
+ moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig
.getJettySelectors());
jettyServer.initialize();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
- this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
+ this.registerServiceImplementation(
+ DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
@@ -192,17 +202,24 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
- this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager(), moduleConfig));
+ this.registerServiceImplementation(
+ ServiceInventoryCache.class, new ServiceInventoryCache(getManager(), moduleConfig));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
- this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager(), moduleConfig));
- this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
+ this.registerServiceImplementation(
+ ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager(), moduleConfig));
+ this.registerServiceImplementation(
+ IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
- this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager(), moduleConfig));
- this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
+ this.registerServiceImplementation(
+ EndpointInventoryCache.class, new EndpointInventoryCache(getManager(), moduleConfig));
+ this.registerServiceImplementation(
+ IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
- this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager(), moduleConfig));
- this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
+ this.registerServiceImplementation(
+ NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager(), moduleConfig));
+ this.registerServiceImplementation(
+ INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
@@ -214,8 +231,10 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
// add profile service implementations
- this.registerServiceImplementation(ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
- this.registerServiceImplementation(ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
+ this.registerServiceImplementation(
+ ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
+ this.registerServiceImplementation(
+ ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
@@ -248,10 +267,13 @@ public class CoreModuleProvider extends ModuleProvider {
}
if (CoreModuleConfig.Role.Mixed.name()
- .equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name()
- .equalsIgnoreCase(moduleConfig
- .getRole())) {
- RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
+ .equalsIgnoreCase(
+ moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name()
+ .equalsIgnoreCase(
+ moduleConfig
+ .getRole())) {
+ RemoteInstance gRPCServerInstance = new RemoteInstance(
+ new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager()
.find(ClusterModule.NAME)
.provider()
@@ -261,7 +283,8 @@ public class CoreModuleProvider extends ModuleProvider {
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
- .getService(DynamicConfigurationService.class);
+ .getService(
+ DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
index 74709f7..0cf0f38 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
@@ -22,17 +22,41 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProcessor;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
+import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+/**
+ * Stream annotation represents a metadata definition. Include the key values of the distributed streaming calculation.
+ * See {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link InventoryStreamProcessor}, {@link
+ * TopNStreamProcessor} and {@link NoneStreamingProcessor} for more details.
+ */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Stream {
-
+ /**
+ * @return name of this stream definition.
+ */
String name();
+ /**
+ * @return scope id, see {@link ScopeDeclaration}
+ */
int scopeId();
+ /**
+ * @return the converter type between {@link StorageBuilder} and {@link Map} for persistence.
+ */
Class<? extends StorageBuilder> builder();
+ /**
+ * @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link
+ * InventoryStreamProcessor}, {@link TopNStreamProcessor} and {@link NoneStreamingProcessor} for more details.
+ */
Class<? extends StreamProcessor> processor();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
index 154a439..13abca6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
@@ -28,6 +28,9 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+/**
+ * Stream annotation listener, process the class with {@link Stream} annotation.
+ */
public class StreamAnnotationListener implements AnnotationListener {
private final ModuleDefineHolder moduleDefineHolder;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
index 822a44e..63090bc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
@@ -24,6 +24,10 @@ import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+/**
+ * A bridge worker. If the {@link ExporterModule} provider declared and provides a implementation of {@link
+ * MetricValuesExportService}, forward the export data to it.
+ */
public class ExportWorker extends AbstractWorker<ExportEvent> {
private MetricValuesExportService exportService;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 7daebc8..f6f944a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -36,6 +36,12 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * MetricsAggregateWorker provides an in-memory metrics merging capability. This aggregation is called L1 aggregation,
+ * it merges the data just after the receiver analysis. The metrics belonging to the same entity, metrics type and time
+ * bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network
+ * payload.
+ */
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsAggregateWorker.class);
@@ -46,14 +52,15 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
- String modelName) {
+ String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
+ name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
@@ -64,7 +71,10 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
- aggregationCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min"));
+ aggregationCounter = metricsCreator.createCounter(
+ "metrics_aggregation", "The number of rows in aggregation",
+ new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min")
+ );
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 263cac4..18e808b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
@@ -35,17 +36,17 @@ import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * MetricsPersistentWorker is an extension of {@link PersistenceWorker} and focuses on the Metrics data persistent.
+ */
+@Slf4j
public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
-
- private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);
-
private final Model model;
private final Map<Metrics, Metrics> databaseSession;
private final MergeDataCache<Metrics> mergeDataCache;
@@ -90,6 +91,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
cacheData(metrics);
}
+ /**
+ * Accept all metrics data and push them into the queue for serial processing
+ */
@Override
public void in(Metrics metrics) {
dataCarrier.produce(metrics);
@@ -144,7 +148,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
} catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
}
@@ -152,7 +156,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
if (prepareRequests.size() > 0) {
- logger.debug(
+ log.debug(
"prepare batch requests for model {}, took time: {}", model.getName(),
System.currentTimeMillis() - start
);
@@ -184,6 +188,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
mergeDataCache.finishWriting();
}
+ /**
+ * Sync data to the cache if the {@link #enableDatabaseSession} == true.
+ */
private void syncStorageToCache(Metrics[] metrics) throws IOException {
if (!enableDatabaseSession) {
databaseSession.clear();
@@ -219,6 +226,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
+ /**
+ * Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket.
+ *
+ * ID is declared through {@link IDColumn}
+ */
private class PersistentConsumer implements IConsumer<Metrics> {
private final MetricsPersistentWorker persistent;
@@ -239,7 +251,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
@Override
public void onError(List<Metrics> data, Throwable t) {
- logger.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
index 782338b..3722340 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
@@ -18,19 +18,19 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * MetricsRemoteWorker forwards the metrics to the target OAP node.
+ */
+@Slf4j
public class MetricsRemoteWorker extends AbstractWorker<Metrics> {
-
- private static final Logger logger = LoggerFactory.getLogger(MetricsRemoteWorker.class);
-
private final RemoteSenderService remoteSender;
private final String remoteReceiverWorkerName;
@@ -45,7 +45,7 @@ public class MetricsRemoteWorker extends AbstractWorker<Metrics> {
try {
remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode);
} catch (Throwable e) {
- logger.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 2075b24..54c2861 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -41,13 +41,33 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+/**
+ * MetricsStreamProcessor represents the entrance and creator of the metrics streaming aggregation work flow.
+ *
+ * {@link #in(Metrics)} provides the major entrance for metrics streaming calculation.
+ *
+ * {@link #create(ModuleDefineHolder, Stream, Class)} creates the workers and work flow for every metrics.
+ */
public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
-
+ /**
+ * Singleton instance.
+ */
private final static MetricsStreamProcessor PROCESSOR = new MetricsStreamProcessor();
+ /**
+ * Worker table hosts all entrance workers.
+ */
private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
+
+ /**
+ * Worker table hosts all persistent workers.
+ */
@Getter
private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
+
+ /**
+ * Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker.
+ */
@Setter
@Getter
private boolean enableDatabaseSession;
@@ -63,6 +83,13 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
}
+ /**
+ * Create the workers and work flow for every metrics.
+ *
+ * @param moduleDefineHolder pointer of the module define.
+ * @param stream definition of the metrics class.
+ * @param metricsClass data type of the streaming calculation.
+ */
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
@@ -87,22 +114,28 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null;
if (configService.shouldToHour()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
+ Model model = modelSetter.putIfAbsent(
+ metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToDay()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
+ Model model = modelSetter.putIfAbsent(
+ metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToMonth()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
+ Model model = modelSetter.putIfAbsent(
+ metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
- MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+ MetricsTransWorker transWorker = new MetricsTransWorker(
+ moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
- MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model, transWorker);
+ Model model = modelSetter.putIfAbsent(
+ metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
+ MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
+ moduleDefineHolder, metricsDAO, model, transWorker);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
@@ -111,24 +144,29 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
- MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
+ MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
+ moduleDefineHolder, remoteWorker, stream.name());
entryWorkers.put(metricsClass, aggregateWorker);
}
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
- IMetricsDAO metricsDAO, Model model, MetricsTransWorker transWorker) {
+ IMetricsDAO metricsDAO,
+ Model model,
+ MetricsTransWorker transWorker) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
- MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
+ MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
+ moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
- MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
+ MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
+ moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index 5af1e14..f202700 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -26,13 +26,13 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * MetricsTransWorker is transferring the metrics for downsampling. All streaming process metrics are in the minute
+ * precision, but at the storage layer, in order to enhance the query performance, metrics could be saved in minute,
+ * hour, day and month, including some of them through CoreModuleConfig#downsampling.
+ */
public class MetricsTransWorker extends AbstractWorker<Metrics> {
-
- private static final Logger logger = LoggerFactory.getLogger(MetricsTransWorker.class);
-
private final MetricsPersistentWorker hourPersistenceWorker;
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
@@ -41,9 +41,11 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
private final CounterMetrics aggregationDayCounter;
private final CounterMetrics aggregationMonthCounter;
- public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
- MetricsPersistentWorker hourPersistenceWorker, MetricsPersistentWorker dayPersistenceWorker,
- MetricsPersistentWorker monthPersistenceWorker) {
+ public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder,
+ String modelName,
+ MetricsPersistentWorker hourPersistenceWorker,
+ MetricsPersistentWorker dayPersistenceWorker,
+ MetricsPersistentWorker monthPersistenceWorker) {
super(moduleDefineHolder);
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
@@ -52,11 +54,25 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
- aggregationHourCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "hour"));
- aggregationDayCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "day"));
- aggregationMonthCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "month"));
+ aggregationHourCounter = metricsCreator.createCounter(
+ "metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
+ "dimensionality"
+ ), new MetricsTag.Values(modelName, "2", "hour"));
+ aggregationDayCounter = metricsCreator.createCounter(
+ "metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
+ "dimensionality"
+ ), new MetricsTag.Values(modelName, "2", "day"));
+ aggregationMonthCounter = metricsCreator.createCounter(
+ "metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level",
+ "dimensionality"
+ ), new MetricsTag.Values(modelName, "2", "month"));
}
+ /**
+ * Use the {@link Metrics#toHour()}, {@link Metrics#toDay()} and {@link Metrics#toMonth()} to clone a new metrics
+ * instance then process the downsampling. Then forward the data to different works of different precisions for
+ * another round aggregation/merging.
+ */
@Override
public void in(Metrics metrics) {
if (Objects.nonNull(hourPersistenceWorker)) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
index 9bd86eb..0f98e2d 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
@@ -25,6 +25,9 @@ import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * CommandService represents the command creation factory. All commands for downstream agents should be created here.
+ */
public class CommandService implements Service {
private final ModuleManager moduleManager;
@@ -33,19 +36,20 @@ public class CommandService implements Service {
}
public ServiceResetCommand newResetCommand(final int serviceInstanceId, final long time,
- final String serviceInstanceUUID) {
+ final String serviceInstanceUUID) {
final String serialNumber = generateSerialNumber(serviceInstanceId, time, serviceInstanceUUID);
return new ServiceResetCommand(serialNumber);
}
public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
final String serialNumber = UUID.randomUUID().toString();
- return new ProfileTaskCommand(serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task
+ return new ProfileTaskCommand(
+ serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task
.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
}
private String generateSerialNumber(final int serviceInstanceId, final long time,
- final String serviceInstanceUUID) {
+ final String serviceInstanceUUID) {
return UUID.randomUUID().toString(); // Simply generate a uuid without taking care of the parameters
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
index 6397af0..7c8d644 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/AggregationQueryService.java
@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
+import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -56,7 +56,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getServiceTopN(final String indName, final int topN, final Downsampling downsampling,
final long startTB, final long endTB, final Order order) throws IOException {
- List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceTopN(indName, ValueColumnIds.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
+ List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceTopN(indName, ValueColumnMetadata.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInventory inventory = moduleManager.find(CoreModule.NAME)
.provider()
@@ -71,7 +71,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getAllServiceInstanceTopN(final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
- List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllServiceInstanceTopN(indName, ValueColumnIds.INSTANCE
+ List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllServiceInstanceTopN(indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME)
@@ -87,7 +87,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getServiceInstanceTopN(final int serviceId, final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
- List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceInstanceTopN(serviceId, indName, ValueColumnIds.INSTANCE
+ List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceInstanceTopN(serviceId, indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME)
@@ -103,7 +103,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getAllEndpointTopN(final String indName, final int topN, final Downsampling downsampling,
final long startTB, final long endTB, final Order order) throws IOException {
- List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllEndpointTopN(indName, ValueColumnIds.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
+ List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllEndpointTopN(indName, ValueColumnMetadata.INSTANCE.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME)
.provider()
@@ -118,7 +118,7 @@ public class AggregationQueryService implements Service {
public List<TopNEntity> getEndpointTopN(final int serviceId, final String indName, final int topN,
final Downsampling downsampling, final long startTB, final long endTB, final Order order) throws IOException {
- List<TopNEntity> topNEntities = getAggregationQueryDAO().getEndpointTopN(serviceId, indName, ValueColumnIds.INSTANCE
+ List<TopNEntity> topNEntities = getAggregationQueryDAO().getEndpointTopN(serviceId, indName, ValueColumnMetadata.INSTANCE
.getValueCName(indName), topN, downsampling, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricQueryService.java
index edfc818..ea7d903 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricQueryService.java
@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.query.entity.Thermodynamic;
import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
+import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -75,7 +75,7 @@ public class MetricQueryService implements Service {
where.getKeyValues().add(intKeyValues);
ids.forEach(intKeyValues.getValues()::add);
- return getMetricQueryDAO().getValues(metricsName, downsampling, startTB, endTB, where, ValueColumnIds.INSTANCE.getValueCName(metricsName), ValueColumnIds.INSTANCE
+ return getMetricQueryDAO().getValues(metricsName, downsampling, startTB, endTB, where, ValueColumnMetadata.INSTANCE.getValueCName(metricsName), ValueColumnMetadata.INSTANCE
.getValueFunction(metricsName));
}
@@ -89,7 +89,7 @@ public class MetricQueryService implements Service {
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
}
- return getMetricQueryDAO().getLinearIntValues(indName, downsampling, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
+ return getMetricQueryDAO().getLinearIntValues(indName, downsampling, ids, ValueColumnMetadata.INSTANCE.getValueCName(indName));
}
public List<IntValues> getMultipleLinearIntValues(final String indName, final String id, final int numOfLinear,
@@ -113,7 +113,7 @@ public class MetricQueryService implements Service {
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
}
- IntValues[] multipleLinearIntValues = getMetricQueryDAO().getMultipleLinearIntValues(indName, downsampling, ids, linearIndex, ValueColumnIds.INSTANCE
+ IntValues[] multipleLinearIntValues = getMetricQueryDAO().getMultipleLinearIntValues(indName, downsampling, ids, linearIndex, ValueColumnMetadata.INSTANCE
.getValueCName(indName));
List<IntValues> response = new ArrayList<>(linearIndex.size());
@@ -133,6 +133,6 @@ public class MetricQueryService implements Service {
}
});
- return getMetricQueryDAO().getThermodynamic(indName, downsampling, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
+ return getMetricQueryDAO().getThermodynamic(indName, downsampling, ids, ValueColumnMetadata.INSTANCE.getValueCName(indName));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index ab16bab..178f316 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -37,10 +37,23 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+/**
+ * InventoryStreamProcessor represents the entrance and creator of the inventory register work flow.
+ *
+ * Method #in provides the major entrance for inventory streaming merge, eventually add or update the
+ * inventory data in the storage.
+ *
+ * Method #create creates the workers and work flow for every inventory.
+ */
public class InventoryStreamProcessor implements StreamProcessor<RegisterSource> {
-
+ /**
+ * Singleton instance.
+ */
private static final InventoryStreamProcessor PROCESSOR = new InventoryStreamProcessor();
+ /**
+ * Worker table hosts all entrance workers.
+ */
private Map<Class<? extends RegisterSource>, RegisterDistinctWorker> entryWorkers = new HashMap<>();
public static InventoryStreamProcessor getInstance() {
@@ -51,9 +64,16 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
entryWorkers.get(registerSource.getClass()).in(registerSource);
}
+ /**
+ * Create the workers and work flow for every inventory.
+ *
+ * @param moduleDefineHolder pointer of the module define.
+ * @param stream definition of the inventory class.
+ * @param inventoryClass data type of the inventory.
+ */
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
- Class<? extends RegisterSource> inventoryClass) {
+ Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
@@ -63,9 +83,11 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
+ Model model = modelSetter.putIfAbsent(
+ inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
- RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream
+ RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(
+ moduleDefineHolder, model.getName(), registerDAO, stream
.scopeId());
String remoteReceiverWorkerName = stream.name() + "_rec";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
index 14bdb75..729125f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+/**
+ * Covert the {@link RemoteData} received from the network to the current data entity.
+ */
public interface Deserializable {
void deserialize(RemoteData remoteData);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
index 7c2fc5c..cd4a54e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -32,6 +32,10 @@ import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * RemoteSenderService represents a gRPC client to send metrics from one OAP node to another through network. It
+ * provides several routing mode to select target OAP node.
+ */
public class RemoteSenderService implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);
@@ -47,6 +51,13 @@ public class RemoteSenderService implements Service {
this.rollingSelector = new RollingSelector();
}
+ /**
+ * Send data to the target based on the given selector
+ *
+ * @param nextWorkName points to the worker to process the data when {@link RemoteServiceHandler} received.
+ * @param streamData data to be sent
+ * @param selector strategy implementation to choose suitable OAP node.
+ */
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME)
.provider()
@@ -55,7 +66,8 @@ public class RemoteSenderService implements Service {
List<RemoteClient> clientList = clientManager.getRemoteClient();
if (clientList.size() == 0) {
- logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
+ logger.warn(
+ "There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
return;
}
switch (selector) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index c2c033e..9fa2e50 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -61,21 +61,40 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
remoteInCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createCounter("remote_in_count", "The number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createCounter(
+ "remote_in_count",
+ "The number(server side) of inside remote inside aggregate rpc.",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createCounter(
+ "remote_in_error_count",
+ "The error number(server side) of inside remote inside aggregate rpc.",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
remoteInTargetNotFoundCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createCounter("remote_in_target_not_found_count", "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createCounter(
+ "remote_in_target_not_found_count",
+ "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
- .createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ .createHistogramMetric(
+ "remote_in_latency",
+ "The latency(server side) of inside remote inside aggregate rpc.",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
+ );
}
+ /**
+ * gRPC handler of {@link RemoteServiceGrpc}. Continue the distributed aggregation at the current OAP node.
+ */
@Override
public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(workerInstanceGetter)) {
@@ -106,7 +125,10 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
nextWorker.in(streamData);
} else {
remoteInTargetNotFoundCounter.inc();
- logger.warn("Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.", nextWorkerName);
+ logger.warn(
+ "Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.",
+ nextWorkerName
+ );
}
} catch (Throwable t) {
remoteInErrorCounter.inc();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
index 382371c..d5dc58a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+/**
+ * Covert the interface implementation to {@link RemoteData.Builder}, in order to send the data through network.
+ */
public interface Serializable {
RemoteData.Builder serialize();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
index dab0d49..9d066e4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
@@ -20,5 +20,8 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * A specific interface for storage layer services.
+ */
public interface DAO extends Service {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index 883b984..c6bb736 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -22,9 +22,26 @@ import java.util.List;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+/**
+ * IBatchDAO provides two modes of data persistence supported by most databases, including synchronous and
+ * asynchronous.
+ */
public interface IBatchDAO extends DAO {
-
+ /**
+ * Push data into the database in async mode. This method is driven by streaming process. This method doesn't
+ * request the data queryable immediately after the method finished.
+ *
+ * All data are in the additional mode, no modification.
+ *
+ * @param insertRequest data to insert.
+ */
void asynchronous(InsertRequest insertRequest);
+ /**
+ * Make all given PrepareRequest efficient in the sync mode. All requests could be confirmed by the database. All
+ * changes are required queryable after method returns.
+ *
+ * @param prepareRequests data to insert or update. No delete happens in streaming mode.
+ */
void synchronous(List<PrepareRequest> prepareRequests);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
index 2b7d826..d778af0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
@@ -19,9 +19,19 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+/**
+ * Remove all expired data based on TTL configurations.
+ */
public interface IHistoryDeleteDAO extends DAO {
-
+ /**
+ * Delete the data
+ *
+ * @param model data entity.
+ * @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
+ * @throws IOException when error happens in the deletion process.
+ */
void deleteHistory(Model model, String timeBucketColumnName) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 2cc35dc..c5add94 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -25,11 +25,33 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+/**
+ * Metrics related DAO.
+ */
public interface IMetricsDAO extends DAO {
-
+ /**
+ * Read data from the storage by given IDs.
+ *
+ * @param model target entity of this query.
+ * @param ids ID list.
+ * @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
+ * @throws IOException when error occurs in data query.
+ */
List<Metrics> multiGet(Model model, List<String> ids) throws IOException;
+ /**
+ * Transfer the given metrics to an executable insert statement.
+ *
+ * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
+ * executed ASAP.
+ */
InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
+ /**
+ * Transfer the given metrics to an executable update statement.
+ *
+ * @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
+ * executed ASAP.
+ */
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index 97343d5..8503b87 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -23,7 +23,15 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * DAO specifically for {@link Record} implementations.
+ */
public interface IRecordDAO extends DAO {
-
+ /**
+ * Transfer the given metrics to an executable insert statement.
+ *
+ * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
+ * executed ASAP.
+ */
InsertRequest prepareBatchInsert(Model model, Record record) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
index 87fda70..5afb452 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
@@ -21,11 +21,24 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
+/**
+ * DAO specifically for {@link RegisterSource} implementations.
+ */
public interface IRegisterDAO extends DAO {
-
+ /**
+ * Read the RegisterSource by the given ID.
+ *
+ * @return RegisterSource instance or NULL if id doesn't exist.
+ */
RegisterSource get(String modelName, String id) throws IOException;
+ /**
+ * Do a blocking insert operation.
+ */
void forceInsert(String modelName, RegisterSource source) throws IOException;
+ /**
+ * Do a blocking update operation.
+ */
void forceUpdate(String modelName, RegisterSource source) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
index 0a07a17..f4746b8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
@@ -20,6 +20,11 @@ package org.apache.skywalking.oap.server.core.storage;
import java.util.Map;
+/**
+ * Converter between the give T and Map.
+ *
+ * @param <T> A storage entity implementation.
+ */
public interface StorageBuilder<T extends StorageData> {
T map2Data(Map<String, Object> dbMap);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 45bbb72..89525a8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -24,6 +24,9 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * StorageDAO is a DAO factory for storage layer. Provide the implementations of typical DAO interfaces.
+ */
public interface StorageDAO extends Service {
IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 9f889db..a3194b6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -35,6 +35,10 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+/**
+ * StorageModule provides the capabilities(services) to interact with the database. With different databases, this
+ * module could have different providers, such as currently, H2, MySQL, ES, TiDB.
+ */
public class StorageModule extends ModuleDefine {
public static final String NAME = "storage";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
index f52a74d..d04e2c1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
@@ -23,10 +23,18 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+/**
+ * Data column of all persistent entity.
+ */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
+ /**
+ * column name in the storage. Most of the storage will keep the name consistently. But in same cases, this name
+ * could be a keyword, then, the implementation will use {@link IModelOverride} to replace the column name.
+ */
String columnName();
/**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnIds.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnMetadata.java
similarity index 75%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnIds.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnMetadata.java
index 2b4767d..174f277 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnIds.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ValueColumnMetadata.java
@@ -22,19 +22,32 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.query.sql.Function;
-public enum ValueColumnIds {
+/**
+ * ValueColumnMetadata holds the metadata for column values of metrics. The metadata of ValueColumn is declared through
+ * {@link Column} annotation.
+ */
+public enum ValueColumnMetadata {
INSTANCE;
private Map<String, ValueColumn> mapping = new HashMap<>();
- public void putIfAbsent(String indName, String valueCName, Function function) {
- mapping.putIfAbsent(indName, new ValueColumn(valueCName, function));
+ /**
+ * Register the new metadata for the given model name.
+ */
+ public void putIfAbsent(String modelName, String valueCName, Function function) {
+ mapping.putIfAbsent(modelName, new ValueColumn(valueCName, function));
}
+ /**
+ * Fetch the value column name of the given metrics name.
+ */
public String getValueCName(String metricsName) {
return findColumn(metricsName).valueCName;
}
+ /**
+ * Fetch the function for the value column of the given metrics name.
+ */
public Function getValueFunction(String metricsName) {
return findColumn(metricsName).function;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index 11f78ee..0de46bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -19,24 +19,27 @@
package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * The core module installation controller.
+ */
+@Slf4j
public abstract class ModelInstaller {
-
- private static final Logger logger = LoggerFactory.getLogger(ModelInstaller.class);
-
private final ModuleManager moduleManager;
public ModelInstaller(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
+ /**
+ * Entrance of the storage entity installation work.
+ */
public final void install(Client client) throws StorageException {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
@@ -46,30 +49,43 @@ public abstract class ModelInstaller {
for (Model model : models) {
while (!isExists(client, model)) {
try {
- logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model
- .getName());
+ log.info(
+ "table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.",
+ model
+ .getName()
+ );
Thread.sleep(3000L);
} catch (InterruptedException e) {
- logger.error(e.getMessage());
+ log.error(e.getMessage());
}
}
}
} else {
for (Model model : models) {
if (!isExists(client, model)) {
- logger.info("table: {} does not exist", model.getName());
+ log.info("table: {} does not exist", model.getName());
createTable(client, model);
}
}
}
}
- public final void overrideColumnName(String columnName, String newName) {
+ /**
+ * Installer implementation could use this API to request a column name replacement. This method delegates for
+ * {@link IModelOverride}.
+ */
+ protected final void overrideColumnName(String columnName, String newName) {
IModelOverride modelOverride = moduleManager.find(CoreModule.NAME).provider().getService(IModelOverride.class);
modelOverride.overrideColumnName(columnName, newName);
}
+ /**
+ * Check whether the storage entity exists. Need to implement based on the real storage.
+ */
protected abstract boolean isExists(Client client, Model model) throws StorageException;
+ /**
+ * Create the storage entity. All creations should be after the {@link #isExists(Client, Model)} check.
+ */
protected abstract void createTable(Client client, Model model) throws StorageException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 5f7b790..ad21f2a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -25,7 +25,7 @@ import lombok.Getter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
+import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +73,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
if (column.isValue()) {
- ValueColumnIds.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
+ ValueColumnMetadata.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 33d8630..d83049a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -38,6 +38,14 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * TTL = Time To Live
+ *
+ * DataTTLKeeperTimer is an internal timer, it drives the {@link IHistoryDeleteDAO} to remove the expired data. TTL
+ * configurations are provided in {@link CoreModuleConfig}, some storage implementations, such as ES6/ES7, provides an
+ * override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
+ * are all driven by this timer.
+ */
public enum DataTTLKeeperTimer {
INSTANCE;
@@ -51,10 +59,18 @@ public enum DataTTLKeeperTimer {
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
Executors.newSingleThreadScheduledExecutor()
- .scheduleAtFixedRate(new RunnableWithExceptionProtection(this::delete, t -> logger.error("Remove data in background failure.", t)), moduleConfig
- .getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
+ .scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(
+ this::delete,
+ t -> logger.error("Remove data in background failure.", t)
+ ), moduleConfig
+ .getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
}
+ /**
+ * DataTTLKeeperTimer starts in every OAP node, but the deletion only work when it is as the first node in the OAP
+ * node list from {@link ClusterNodesQuery}.
+ */
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index 60f77db..8ee41b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -21,6 +21,12 @@ package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+/**
+ * Abstract worker definition. Provide the {@link ModuleDefineHolder} to make sure the worker could find and access
+ * services in different modules. Also, {@link #in(Object)} is provided as the primary entrance of every worker.
+ *
+ * @param <INPUT> the datatype this worker implementation processes.
+ */
public abstract class AbstractWorker<INPUT> {
@Getter
@@ -30,5 +36,8 @@ public abstract class AbstractWorker<INPUT> {
this.moduleDefineHolder = moduleDefineHolder;
}
+ /**
+ * Main entrance of this worker.
+ */
public abstract void in(INPUT input);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
index c98175a..3ba0112 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
@@ -20,8 +20,10 @@ package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * Worker instance finder interface. Find work instance from all registered work instance based on worker name.
+ */
public interface IWorkerInstanceGetter extends Service {
-
RemoteHandleWorker get(String nextWorkerName);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
index 6336a81..c71698a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
@@ -18,9 +18,18 @@
package org.apache.skywalking.oap.server.core.worker;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
+/**
+ * Worker instance register interface. Push the worker name, instance and class type having {@link Stream} annotation.
+ */
public interface IWorkerInstanceSetter extends Service {
+ /**
+ * @param remoteReceiverWorkName worker name
+ * @param instance The worker instance processes the given streamDataClass.
+ * @param streamDataClass Type of metrics.
+ */
void put(String remoteReceiverWorkName, AbstractWorker instance, Class<? extends StreamData> streamDataClass);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/RegisterServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/RegisterServiceHandler.java
index ec72b32..a7c250e 100644
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/RegisterServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/RegisterServiceHandler.java
@@ -61,6 +61,12 @@ import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInve
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.OS_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.PROCESS_NO;
+/**
+ * RegisterServiceHandler responses the requests of multiple inventory entities register, including service, instance,
+ * endpoint, network address and address-service mapping. Responses of service, instance and endpoint register include
+ * the IDs to represents these entities. Agent could use them in the header and data report to reduce the network
+ * bandwidth resource costs.
+ */
public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(RegisterServiceHandler.class);
@@ -109,7 +115,8 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
serviceType = ServiceType.normal;
}
- int serviceId = serviceInventoryRegister.getOrCreate(serviceName, NodeType.fromRegisterServiceType(serviceType), null);
+ int serviceId = serviceInventoryRegister.getOrCreate(
+ serviceName, NodeType.fromRegisterServiceType(serviceType), null);
if (serviceId != Const.NONE) {
KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
@@ -123,7 +130,7 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
@Override
public void doServiceInstanceRegister(ServiceInstances request,
- StreamObserver<ServiceInstanceRegisterMapping> responseObserver) {
+ StreamObserver<ServiceInstanceRegisterMapping> responseObserver) {
ServiceInstanceRegisterMapping.Builder builder = ServiceInstanceRegisterMapping.newBuilder();
@@ -176,8 +183,9 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
}
}
- int serviceInstanceId = serviceInstanceInventoryRegister.getOrCreate(instance.getServiceId(), instanceName, instanceUUID, instance
- .getTime(), instanceProperties);
+ int serviceInstanceId = serviceInstanceInventoryRegister.getOrCreate(
+ instance.getServiceId(), instanceName, instanceUUID, instance
+ .getTime(), instanceProperties);
if (serviceInstanceId != Const.NONE) {
logger.info("register service instance id={} [UUID:{}]", serviceInstanceId, instanceUUID);
@@ -237,7 +245,7 @@ public class RegisterServiceHandler extends RegisterGrpc.RegisterImplBase implem
@Override
public void doServiceAndNetworkAddressMappingRegister(ServiceAndNetworkAddressMappings request,
- StreamObserver<Commands> responseObserver) {
+ StreamObserver<Commands> responseObserver) {
request.getMappingsList().forEach(mapping -> {
int serviceId = mapping.getServiceId();
diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java
index 24e8b55..c7aec32 100755
--- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.grpc;
import io.grpc.stub.StreamObserver;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.register.v2.ServiceInstancePingGrpc;
@@ -32,14 +34,13 @@ import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceIn
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
+/**
+ * ServiceInstancePingServiceHandler responses the requests for instance ping. Trigger the heartbeat update and push the
+ * commands to the downstream.
+ */
+@Slf4j
public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {
- private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServiceHandler.class);
-
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private final IServiceInventoryRegister serviceInventoryRegister;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
@@ -69,10 +70,14 @@ public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.S
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
responseObserver.onNext(Commands.getDefaultInstance());
} else {
- logger.warn("Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side", serviceInstanceId);
+ log.warn(
+ "Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side",
+ serviceInstanceId
+ );
- final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request
- .getTime(), request.getServiceInstanceUUID());
+ final ServiceResetCommand resetCommand = commandService.newResetCommand(
+ request.getServiceInstanceId(), request
+ .getTime(), request.getServiceInstanceUUID());
final Command command = resetCommand.serialize().build();
final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
responseObserver.onNext(nextCommands);
diff --git a/oap-server/server-starter-es7/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java b/oap-server/server-starter-es7/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
index 6dd180a..f91f5f8 100644
--- a/oap-server/server-starter-es7/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
+++ b/oap-server/server-starter-es7/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.starter;
+/**
+ * OAP starter specific for the ES7 storage. This includes the same code of OAPServerStartUp in the `server-starter`
+ * module.
+ */
public class OAPServerStartUp {
-
public static void main(String[] args) {
OAPServerBootstrap.start();
}
diff --git a/oap-server/server-starter/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java b/oap-server/server-starter/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
index 6dd180a..84eebb0 100644
--- a/oap-server/server-starter/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
+++ b/oap-server/server-starter/src/main/java/org/apache/skywalking/oap/server/starter/OAPServerStartUp.java
@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.starter;
+/**
+ * OAP starter
+ */
public class OAPServerStartUp {
-
public static void main(String[] args) {
OAPServerBootstrap.start();
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
index 1fa6eac..f16f8b3 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@@ -35,12 +36,13 @@ import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariC
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * H2 table initialization. Create tables without Indexes. H2 is for the demonstration only, so, keep the logic as
+ * simple as possible.
+ */
+@Slf4j
public class H2TableInstaller extends ModelInstaller {
- private static final Logger logger = LoggerFactory.getLogger(H2TableInstaller.class);
-
public H2TableInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
@@ -71,14 +73,15 @@ public class H2TableInstaller extends ModelInstaller {
for (int i = 0; i < model.getColumns().size(); i++) {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
- tableCreateSQL.appendLine(name.getStorageName() + " " + getColumnType(model, name, column.getType()) + (i != model
- .getColumns()
- .size() - 1 ? "," : ""));
+ tableCreateSQL.appendLine(
+ name.getStorageName() + " " + getColumnType(model, name, column.getType()) + (i != model
+ .getColumns()
+ .size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
- if (logger.isDebugEnabled()) {
- logger.debug("creating table: " + tableCreateSQL.toStringInNewLine());
+ if (log.isDebugEnabled()) {
+ log.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
try (Connection connection = h2Client.getConnection()) {
@@ -91,6 +94,9 @@ public class H2TableInstaller extends ModelInstaller {
}
+ /**
+ * Set up the data type mapping between Java type and H2 database type
+ */
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return "INT";
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 55404d9..6960883 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.Connection;
import java.sql.SQLException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
@@ -33,8 +34,6 @@ import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariC
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY;
@@ -46,10 +45,8 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
/**
* Extend H2TableInstaller but match MySQL SQL syntax.
*/
+@Slf4j
public class MySQLTableInstaller extends H2TableInstaller {
-
- private static final Logger logger = LoggerFactory.getLogger(MySQLTableInstaller.class);
-
public MySQLTableInstaller(ModuleManager moduleManager) {
super(moduleManager);
/*
@@ -66,6 +63,9 @@ public class MySQLTableInstaller extends H2TableInstaller {
this.createIndexes(jdbcHikariCPClient, model);
}
+ /**
+ * Based on MySQL features, provide a specific data type mappings.
+ */
@Override
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
@@ -92,6 +92,12 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
}
+ /**
+ * Create indexes of all tables. Due to MySQL storage is suitable for middle size use case and also compatible with
+ * TiDB users, Indexes are required for the UI query.
+ *
+ * Based on different Model, provide different index creation strategy.
+ */
protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
switch (model.getScopeId()) {
case SERVICE_INVENTORY:
@@ -198,9 +204,9 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
private void createIndex(JDBCHikariCPClient client, Connection connection, Model model,
- SQLBuilder indexSQL) throws JDBCClientException {
- if (logger.isDebugEnabled()) {
- logger.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
+ SQLBuilder indexSQL) throws JDBCClientException {
+ if (log.isDebugEnabled()) {
+ log.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
}
client.execute(connection, indexSQL.toString());
}