You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/06/03 10:46:51 UTC
[skywalking] branch master updated: Good news of an important
feature: time series implementation in Elasticsearch storage. (#2808)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5036e83 Good news of an important feature: time series implementation in Elasticsearch storage. (#2808)
5036e83 is described below
commit 5036e8334301322d49973005cd98513c17b0f28f
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Mon Jun 3 18:46:42 2019 +0800
Good news of an important feature: time series implementation in Elasticsearch storage. (#2808)
* The new feature of Elasticsearch storage with the time series implementation.
* Time bucket util improve.
* Fixed some Gramma error.
* Update ttl.md
Polish English a little.
* Update ttl.md
Add more explicit description of TTL
* Update ttl.md
* Update StorageModuleElasticsearchConfig.java
Set otherMetricsDataTTL = 0 as default, make user more clear.
* Rename the timeSeriesAble to capableOfTimeSeries
* Fixed a test case failure.
---
docs/en/setup/backend/backend-storage.md | 14 +-
docs/en/setup/backend/ttl.md | 22 ++-
.../oap/server/core/CoreModuleConfig.java | 17 +-
.../oap/server/core/CoreModuleProvider.java | 6 +-
.../core/{DataTTL.java => DataTTLConfig.java} | 2 +-
.../oap/server/core/analysis/Downsampling.java | 2 +-
.../analysis/worker/MetricsPersistentWorker.java | 15 +-
.../analysis/worker/MetricsStreamProcessor.java | 16 +-
.../analysis/worker/RecordPersistentWorker.java | 9 +-
.../analysis/worker/RecordStreamProcessor.java | 4 +-
.../core/analysis/worker/TopNStreamProcessor.java | 4 +-
.../server/core/analysis/worker/TopNWorker.java | 9 +-
.../oap/server/core/config/ConfigService.java | 8 +-
.../server/core/register/EndpointInventory.java | 2 +-
.../core/register/NetworkAddressInventory.java | 2 +-
.../core/register/ServiceInstanceInventory.java | 2 +-
.../oap/server/core/register/ServiceInventory.java | 2 +-
.../register/worker/InventoryStreamProcessor.java | 1 +
.../oap/server/core/storage/IHistoryDeleteDAO.java | 3 +-
.../oap/server/core/storage/IMetricsDAO.java | 7 +-
.../oap/server/core/storage/IRecordDAO.java | 3 +-
.../server/core/storage/annotation/Storage.java | 2 +
.../oap/server/core/storage/model/Model.java | 37 +----
.../server/core/storage/model/ModelInstaller.java | 10 --
.../oap/server/core/storage/model/ModelName.java | 2 -
.../server/core/storage/model/StorageModels.java | 24 ++-
.../core/storage/ttl/DataTTLKeeperTimer.java | 18 +--
.../server/core/storage/ttl/DayTTLCalculator.java | 6 +-
...thTTLCalculator.java => GeneralStorageTTL.java} | 18 ++-
.../server/core/storage/ttl/HourTTLCalculator.java | 6 +-
.../core/storage/ttl/MinuteTTLCalculator.java | 6 +-
.../core/storage/ttl/MonthTTLCalculator.java | 6 +-
.../core/storage/ttl/SecondTTLCalculator.java | 6 +-
.../ttl/{TTLCalculator.java => StorageTTL.java} | 8 +-
.../oap/server/core/storage/ttl/TTLCalculator.java | 4 +-
.../core/storage/StorageInstallerTestCase.java | 8 -
.../client/elasticsearch/ElasticSearchClient.java | 39 ++++-
.../elasticsearch/ITElasticSearchClient.java | 8 +-
.../src/main/assembly/application.yml | 5 +
.../src/main/resources/application.yml | 7 +-
.../StorageModuleElasticsearchConfig.java | 176 ++++-----------------
.../StorageModuleElasticsearchProvider.java | 68 +++-----
.../elasticsearch/base/HistoryDeleteEsDAO.java | 46 +++++-
.../plugin/elasticsearch/base/MetricsEsDAO.java | 11 +-
.../plugin/elasticsearch/base/RecordEsDAO.java | 4 +-
.../elasticsearch/base/StorageEsInstaller.java | 66 ++++----
.../plugin/elasticsearch/base/TimeSeriesUtils.java | 56 +++++++
.../ttl/ElasticsearchStorageTTL.java} | 20 +--
.../elasticsearch/ttl/EsHourTTLCalculator.java} | 11 +-
.../elasticsearch/ttl/EsMinuteTTLCalculator.java} | 11 +-
.../base/TimeSeriesUtilsTestCase.java} | 12 +-
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 52 +-----
.../plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java | 27 +++-
.../storage/plugin/jdbc/h2/dao/H2MetricsDAO.java | 16 +-
.../storage/plugin/jdbc/h2/dao/H2RecordDAO.java | 9 +-
.../plugin/jdbc/h2/dao/H2TableInstaller.java | 23 +--
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 50 ++----
.../plugin/jdbc/mysql/MySQLTableInstaller.java | 40 ++---
58 files changed, 487 insertions(+), 581 deletions(-)
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 806215b..415e3a9 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -29,7 +29,7 @@ storage:
## ElasticSearch 6
Active ElasticSearch 6 as storage, set storage provider to **elasticsearch**.
-**Required ElasticSearch 6.3.0 or higher. HTTP RestHighLevelClient is used to connect server.**
+**Required ElasticSearch 6.3.2 or higher, excepted 7.0.0 or higher. HTTP RestHighLevelClient is used to connect server.**
Setting fragment example
@@ -42,6 +42,10 @@ storage:
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+ # Those data TTL settings will override the same settings in core module.
+ recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+ otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+ monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
@@ -61,6 +65,10 @@ storage:
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+ # Those data TTL settings will override the same settings in core module.
+ recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+ otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+ monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
@@ -80,6 +88,10 @@ storage:
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+ # Those data TTL settings will override the same settings in core module.
+ recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+ otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+ monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
diff --git a/docs/en/setup/backend/ttl.md b/docs/en/setup/backend/ttl.md
index e38978f..e78aa95 100644
--- a/docs/en/setup/backend/ttl.md
+++ b/docs/en/setup/backend/ttl.md
@@ -7,6 +7,7 @@ Metric is separated in minute/hour/day/month dimensions in storage, different in
You have following settings for different types.
```yaml
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
+ enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
@@ -16,4 +17,23 @@ You have following settings for different types.
- `recordDataTTL` affects **Record** data.
- `minuteMetricsDataTTL`, `hourMetricsDataTTL`, `dayMetricsDataTTL` and `monthMetricsDataTTL` affects
-metrics data in minute/hour/day/month dimensions.
\ No newline at end of file
+metrics data in minute/hour/day/month dimensions.
+
+## ElasticSearch 6 storage TTL
+**Specifically:**
+Because of the feature of ElasticSearch, it rebuilds the index after executing delete by query command.
+That is a heavy operation, it will hang up the ElasticSearch server for a few seconds each time. The fact is there are above hundred indexes which may cause ElasticSearch out of service unexpected.
+So, we create the index by day to avoid execute delete by query operation,
+then delete the index directly, this is a high performance operation, say goodbye to hung up.
+
+You have following settings in Elasticsearch storage.
+```yaml
+ # Those data TTL settings will override the same settings in core module.
+ recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+ otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+ monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
+```
+
+- `recordDataTTL` affects **Record** data.
+- `otherMetricsDataTTL` affects minute/hour/day dimensions of metrics. `minuteMetricsDataTTL`, `hourMetricsDataTTL` and `dayMetricsDataTTL` are still there, but the **Unit** of them changed to **DAY** too. If you want to set them manually, please remove `otherMetricsDataTTL`.
+- `monthMetricsDataTTL` affects month dimension of metrics.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 31b59cd..908b22d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -38,6 +38,7 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int maxConcurrentCallsPerConnection;
@Setter private int maxMessageSize;
private final List<String> downsampling;
+ @Setter private boolean enableDataKeeperExecutor = true;
@Setter private int recordDataTTL;
@Setter private int minuteMetricsDataTTL;
@Setter private int hourMetricsDataTTL;
@@ -48,14 +49,14 @@ public class CoreModuleConfig extends ModuleConfig {
this.downsampling = new ArrayList<>();
}
- public DataTTL getDataTTL() {
- DataTTL dataTTL = new DataTTL();
- dataTTL.setRecordDataTTL(recordDataTTL);
- dataTTL.setMinuteMetricsDataTTL(minuteMetricsDataTTL);
- dataTTL.setHourMetricsDataTTL(hourMetricsDataTTL);
- dataTTL.setDayMetricsDataTTL(dayMetricsDataTTL);
- dataTTL.setMonthMetricsDataTTL(monthMetricsDataTTL);
- return dataTTL;
+ public DataTTLConfig getDataTTL() {
+ DataTTLConfig dataTTLConfig = new DataTTLConfig();
+ dataTTLConfig.setRecordDataTTL(recordDataTTL);
+ dataTTLConfig.setMinuteMetricsDataTTL(minuteMetricsDataTTL);
+ dataTTLConfig.setHourMetricsDataTTL(hourMetricsDataTTL);
+ dataTTLConfig.setDayMetricsDataTTL(dayMetricsDataTTL);
+ dataTTLConfig.setMonthMetricsDataTTL(monthMetricsDataTTL);
+ return dataTTLConfig;
}
public enum Role {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9b443d6..1545a2b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -33,7 +33,6 @@ import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHan
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
@@ -180,8 +179,9 @@ public class CoreModuleProvider extends ModuleProvider {
PersistenceTimer.INSTANCE.start(getManager());
- DataTTLKeeperTimer.INSTANCE.setDataTTL(moduleConfig.getDataTTL());
- DataTTLKeeperTimer.INSTANCE.start(getManager());
+ if (moduleConfig.isEnableDataKeeperExecutor()) {
+ DataTTLKeeperTimer.INSTANCE.start(getManager());
+ }
CacheUpdateTimer.INSTANCE.start(getManager());
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTL.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTLConfig.java
similarity index 97%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTL.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTLConfig.java
index a9a0de1..34c8ef5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTL.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/DataTTLConfig.java
@@ -25,7 +25,7 @@ import lombok.*;
*/
@Setter
@Getter
-public class DataTTL {
+public class DataTTLConfig {
private int recordDataTTL;
private int minuteMetricsDataTTL;
private int hourMetricsDataTTL;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Downsampling.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Downsampling.java
index 2cb5210..f1f711f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Downsampling.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Downsampling.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis;
* @author peng-yongsheng
*/
public enum Downsampling {
- Second(0, "second"), Minute(1, "minute"), Hour(2, "hour"), Day(3, "day"), Month(4, "month");
+ None(0, ""), Second(1, "second"), Minute(2, "minute"), Hour(3, "hour"), Day(4, "day"), Month(5, "month");
private final int value;
private final String name;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index f094bd6..513f3ae 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -38,18 +39,18 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);
- private final String modelName;
+ private final Model model;
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<Metrics> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
- MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
+ MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<Metrics> nextExportWorker) {
super(moduleDefineHolder, batchSize);
- this.modelName = modelName;
+ this.model = model;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = nextAlarmWorker;
@@ -67,7 +68,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
throw new UnexpectedException(e.getMessage(), e);
}
- this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + modelName, name, 1, 2000);
+ this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
}
@@ -101,7 +102,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
cache.getLast().collection().forEach(data -> {
Metrics dbData = null;
try {
- dbData = metricsDAO.get(modelName, data);
+ dbData = metricsDAO.get(model, data);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
@@ -110,9 +111,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
data.combine(dbData);
data.calculate();
- batchCollection.add(metricsDAO.prepareBatchUpdate(modelName, data));
+ batchCollection.add(metricsDAO.prepareBatchUpdate(model, data));
} else {
- batchCollection.add(metricsDAO.prepareBatchInsert(modelName, data));
+ batchCollection.add(metricsDAO.prepareBatchInsert(model, data));
}
if (Objects.nonNull(nextAlarmWorker)) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index b88c081..9f9499d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -71,19 +71,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
if (configService.shouldToHour()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour);
- hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToDay()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day);
- dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToMonth()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month);
- monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute);
- MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model.getName());
+ MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
@@ -92,19 +92,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
entryWorkers.put(metricsClass, aggregateWorker);
}
- private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, String modelName) {
+ private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
- MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, modelName,
+ MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model,
1000, metricsDAO, alarmNotifyWorker, exportWorker);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
- private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, String modelName) {
- MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, modelName,
+ private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
+ MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model,
1000, metricsDAO, null, null);
persistentWorkers.add(persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 7dfb7f4..16a0a3c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -35,15 +36,15 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
- private final String modelName;
+ private final Model model;
private final NonMergeDataCache<Record> nonMergeDataCache;
private final IRecordDAO recordDAO;
private final DataCarrier<Record> dataCarrier;
- RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, String modelName, int batchSize,
+ RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
IRecordDAO recordDAO) {
super(moduleDefineHolder, batchSize);
- this.modelName = modelName;
+ this.model = model;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
@@ -71,7 +72,7 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
- batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+ batchCollection.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index ff2a636..e460b5a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -64,8 +64,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Second);
- RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model.getName(), 1000, recordDAO);
+ Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute);
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 4ba101d..e3c648c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -60,9 +60,9 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage());
+ Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute);
- TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model.getName(), 50, recordDAO);
+ TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index fd6ab02..37a8265 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -38,17 +39,17 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO;
- private final String modelName;
+ private final Model model;
private final DataCarrier<TopN> dataCarrier;
private long reportCycle;
private volatile long lastReportTimestamp;
- public TopNWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
+ public TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
int topNSize, IRecordDAO recordDAO) {
super(moduleDefineHolder, -1);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
- this.modelName = modelName;
+ this.model = model;
this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis();
@@ -99,7 +100,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
- batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+ batchCollection.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
index 36cf031..a5f86f7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.config;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModuleConfig;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
@@ -27,11 +27,13 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
@Getter
public class ConfigService implements Service {
- private String gRPCHost;
- private int gRPCPort;
+ private final String gRPCHost;
+ private final int gRPCPort;
+ private final DataTTLConfig dataTTLConfig;
public ConfigService(CoreModuleConfig moduleConfig) {
this.gRPCHost = moduleConfig.getGRPCHost();
this.gRPCPort = moduleConfig.getGRPCPort();
+ this.dataTTLConfig = moduleConfig.getDataTTL();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
index 3fc2f5e..eeeeb68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN
* @author peng-yongsheng
*/
@ScopeDeclaration(id = ENDPOINT_INVENTORY, name = "EndpointInventory")
-@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, storage = @Storage(builder = EndpointInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
+@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, storage = @Storage(builder = EndpointInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class)
public class EndpointInventory extends RegisterSource {
public static final String INDEX_NAME = "endpoint_inventory";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
index 26cb514..0eb5bc4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NE
* @author peng-yongsheng
*/
@ScopeDeclaration(id = NETWORK_ADDRESS, name = "NetworkAddress")
-@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, storage = @Storage(builder = NetworkAddressInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
+@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, storage = @Storage(builder = NetworkAddressInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class)
public class NetworkAddressInventory extends RegisterSource {
public static final String INDEX_NAME = "network_address_inventory";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
index 15a5c92..895e2a8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
@@ -38,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
* @author peng-yongsheng
*/
@ScopeDeclaration(id = SERVICE_INSTANCE_INVENTORY, name = "ServiceInstanceInventory")
-@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, storage = @Storage(builder = ServiceInstanceInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
+@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, storage = @Storage(builder = ServiceInstanceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class)
public class ServiceInstanceInventory extends RegisterSource {
public static final String INDEX_NAME = "service_instance_inventory";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
index ea9b7d7..cec189d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
@@ -37,7 +37,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
* @author peng-yongsheng
*/
@ScopeDeclaration(id = SERVICE_INVENTORY, name = "ServiceInventory")
-@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @Storage(builder = ServiceInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
+@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @Storage(builder = ServiceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class)
public class ServiceInventory extends RegisterSource {
public static final String INDEX_NAME = "service_inventory";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 3111902..2cc8b7e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -55,6 +55,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage());
+
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
index 382d7dd..bdca6c4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
@@ -19,11 +19,12 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* @author peng-yongsheng
*/
public interface IHistoryDeleteDAO extends DAO {
- void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException;
+ void deleteHistory(Model model, String timeBucketColumnName) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index d256175..29ec617 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -20,15 +20,16 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* @author peng-yongsheng
*/
public interface IMetricsDAO<INSERT, UPDATE> extends DAO {
- Metrics get(String modelName, Metrics metrics) throws IOException;
+ Metrics get(Model model, Metrics metrics) throws IOException;
- INSERT prepareBatchInsert(String modelName, Metrics metrics) throws IOException;
+ INSERT prepareBatchInsert(Model model, Metrics metrics) throws IOException;
- UPDATE prepareBatchUpdate(String modelName, Metrics metrics) throws IOException;
+ UPDATE prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index 0b44884..312b89f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
/**
* @author peng-yongsheng
*/
public interface IRecordDAO<INSERT> extends DAO {
- INSERT prepareBatchInsert(String modelName, Record record) throws IOException;
+ INSERT prepareBatchInsert(Model model, Record record) throws IOException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
index 8a60be1..6a8073c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
@@ -31,4 +31,6 @@ public @interface Storage {
Class<? extends StorageBuilder> builder();
boolean deleteHistory() default true;
+
+ boolean capableOfTimeSeries() default true;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 37232d9..b2dccc9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -20,50 +20,27 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.ttl.*;
/**
* @author peng-yongsheng
*/
@Getter
public class Model {
+
private final String name;
+ private final boolean capableOfTimeSeries;
+ private final Downsampling downsampling;
private final boolean deleteHistory;
private final List<ModelColumn> columns;
private final int scopeId;
- private final TTLCalculator ttlCalculator;
- public Model(String name, List<ModelColumn> columns, boolean deleteHistory,
- int scopeId, Downsampling downsampling) {
+ public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling) {
this.columns = columns;
+ this.capableOfTimeSeries = capableOfTimeSeries;
+ this.downsampling = downsampling;
this.deleteHistory = deleteHistory;
this.scopeId = scopeId;
-
- switch (downsampling) {
- case Minute:
- this.name = name;
- this.ttlCalculator = new MinuteTTLCalculator();
- break;
- case Hour:
- this.name = name + Const.ID_SPLIT + Downsampling.Hour.getName();
- this.ttlCalculator = new HourTTLCalculator();
- break;
- case Day:
- this.name = name + Const.ID_SPLIT + Downsampling.Day.getName();
- this.ttlCalculator = new DayTTLCalculator();
- break;
- case Month:
- this.name = name + Const.ID_SPLIT + Downsampling.Month.getName();
- this.ttlCalculator = new MonthTTLCalculator();
- break;
- case Second:
- this.name = name;
- this.ttlCalculator = new SecondTTLCalculator();
- break;
- default:
- throw new UnexpectedException("Unexpected downsampling setting.");
- }
+ this.name = ModelName.build(downsampling, name);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index 8efdf5b..f8eed1a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -42,7 +42,6 @@ public abstract class ModelInstaller {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
List<Model> models = modelGetter.getModels();
- boolean debug = System.getProperty("debug") != null;
if (RunningMode.isNoInitMode()) {
for (Model model : models) {
@@ -60,12 +59,7 @@ public abstract class ModelInstaller {
if (!isExists(client, model)) {
logger.info("table: {} does not exist", model.getName());
createTable(client, model);
- } else if (debug) {
- logger.info("table: {} exists", model.getName());
- deleteTable(client, model);
- createTable(client, model);
}
- columnCheck(client, model);
}
}
}
@@ -77,9 +71,5 @@ public abstract class ModelInstaller {
protected abstract boolean isExists(Client client, Model model) throws StorageException;
- protected abstract void columnCheck(Client client, Model model) throws StorageException;
-
- protected abstract void deleteTable(Client client, Model model) throws StorageException;
-
protected abstract void createTable(Client client, Model model) throws StorageException;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java
index 1ee9205..170e7f8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java
@@ -33,8 +33,6 @@ public class ModelName {
return modelName + Const.ID_SPLIT + Downsampling.Day.getName();
case Hour:
return modelName + Const.ID_SPLIT + Downsampling.Hour.getName();
- case Minute:
- return modelName + Const.ID_SPLIT + Downsampling.Minute.getName();
case Second:
return modelName + Const.ID_SPLIT + Downsampling.Second.getName();
default:
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 8cc43fd..bfba7d9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field;
import java.util.*;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.slf4j.*;
@@ -39,7 +39,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
}
@Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) {
- return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.Minute);
+ return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.None);
}
@Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) {
@@ -55,7 +55,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, modelName, modelColumns);
- Model model = new Model(modelName, modelColumns, storage.deleteHistory(), scopeId, downsampling);
+ Model model = new Model(modelName, modelColumns, storage.capableOfTimeSeries(), storage.deleteHistory(), scopeId, downsampling);
models.add(model);
return model;
@@ -83,15 +83,13 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
}
@Override public void overrideColumnName(String columnName, String newName) {
- models.forEach(model -> {
- model.getColumns().forEach(column -> {
- ColumnName existColumnName = column.getColumnName();
- String name = existColumnName.getName();
- if (name.equals(columnName)) {
- existColumnName.setStorageName(newName);
- logger.debug("Model {} column {} has been override. The new column name is {}.", model.getName(), name, newName);
- }
- });
- });
+ models.forEach(model -> model.getColumns().forEach(column -> {
+ ColumnName existColumnName = column.getColumnName();
+ String name = existColumnName.getName();
+ if (name.equals(columnName)) {
+ existColumnName.setStorageName(newName);
+ logger.debug("Model {} column {} has been override. The new column name is {}.", model.getName(), name, newName);
+ }
+ }));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 794a9cc..5e7220f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -21,16 +21,14 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.*;
-import lombok.Setter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.joda.time.DateTime;
import org.slf4j.*;
/**
@@ -43,7 +41,6 @@ public enum DataTTLKeeperTimer {
private ModuleManager moduleManager;
private ClusterNodesQuery clusterNodesQuery;
- @Setter private DataTTL dataTTL;
public void start(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
@@ -51,7 +48,7 @@ public enum DataTTLKeeperTimer {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::delete,
- t -> logger.error("Remove data in background failure.", t)), 1, 5, TimeUnit.MINUTES);
+ t -> logger.error("Remove data in background failure.", t)), 5, 5, TimeUnit.MINUTES);
}
private void delete() {
@@ -62,23 +59,20 @@ public enum DataTTLKeeperTimer {
}
logger.info("Beginning to remove expired metrics from the storage.");
-
- DateTime currentTime = new DateTime();
-
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
List<Model> models = modelGetter.getModels();
models.forEach(model -> {
if (model.isDeleteHistory()) {
- execute(model, model.getTtlCalculator().timeBefore(currentTime, dataTTL));
+ execute(model);
}
});
}
- private void execute(Model model, long timeBucketBefore) {
+ private void execute(Model model) {
try {
- moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(model.getName(), Metrics.TIME_BUCKET, timeBucketBefore);
+ moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(model, Metrics.TIME_BUCKET);
} catch (IOException e) {
- logger.warn("History of {} delete failure, time bucket {}", model.getName(), timeBucketBefore);
+ logger.warn("History of {} delete failure", model.getName());
logger.error(e.getMessage(), e);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
index d14ad77..106ca8e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,7 +25,7 @@ import org.joda.time.DateTime;
*/
public class DayTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusDays(0 - dataTTLConfig.getDayMetricsDataTTL()).toString("yyyyMMdd"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
similarity index 63%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
index a834aa8..8f50e59 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
@@ -17,15 +17,23 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
-import org.joda.time.DateTime;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
/**
* @author peng-yongsheng
*/
-public class MonthTTLCalculator implements TTLCalculator {
+public class GeneralStorageTTL implements StorageTTL {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
+ @Override public TTLCalculator calculator(Downsampling downsampling) {
+ switch (downsampling) {
+ case Hour:
+ return new HourTTLCalculator();
+ case Day:
+ return new DayTTLCalculator();
+ case Month:
+ return new MonthTTLCalculator();
+ default:
+ return new MinuteTTLCalculator();
+ }
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
index d041c42..3caf1e9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,7 +25,7 @@ import org.joda.time.DateTime;
*/
public class HourTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusHours(0 - dataTTLConfig.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
index 3d69f72..83d30e6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,7 +25,7 @@ import org.joda.time.DateTime;
*/
public class MinuteTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusMinutes(0 - dataTTLConfig.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
index a834aa8..46c61ce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,7 +25,7 @@ import org.joda.time.DateTime;
*/
public class MonthTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusMonths(0 - dataTTLConfig.getMonthMetricsDataTTL()).toString("yyyyMM"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
index d53cf89..e15aac9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,7 +25,7 @@ import org.joda.time.DateTime;
*/
public class SecondTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusMinutes(0 - dataTTLConfig.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
similarity index 83%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
index fb80add..0f38659 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
@@ -17,13 +17,11 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
-import org.joda.time.DateTime;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
/**
* @author peng-yongsheng
*/
-public interface TTLCalculator {
-
- long timeBefore(DateTime currentTime, DataTTL dataTTL);
+public interface StorageTTL {
+ TTLCalculator calculator(Downsampling downsampling);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
index fb80add..36a522b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
@@ -17,7 +17,7 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
@@ -25,5 +25,5 @@ import org.joda.time.DateTime;
*/
public interface TTLCalculator {
- long timeBefore(DateTime currentTime, DataTTL dataTTL);
+ long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig);
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
index e0d81b8..73c4c76 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -62,14 +62,6 @@ public class StorageInstallerTestCase {
return false;
}
- @Override protected void columnCheck(Client client, Model tableDefine) throws StorageException {
-
- }
-
- @Override protected void deleteTable(Client client, Model tableDefine) throws StorageException {
-
- }
-
@Override protected void createTable(Client client, Model tableDefine) throws StorageException {
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 90714f6..f37d0f6 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -98,6 +98,15 @@ public class ElasticSearchClient implements Client {
return httpHosts;
}
+ public boolean createIndex(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ CreateIndexRequest request = new CreateIndexRequest(indexName);
+ CreateIndexResponse response = client.indices().create(request);
+ logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ return response.isAcknowledged();
+ }
+
public boolean createIndex(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
@@ -108,6 +117,22 @@ public class ElasticSearchClient implements Client {
return response.isAcknowledged();
}
+ public List<String> retrievalIndexByAliases(String aliases) throws IOException {
+ aliases = formatIndexName(aliases);
+
+ Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases);
+
+ List<String> indexes = new ArrayList<>();
+ if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
+ Gson gson = new Gson();
+ InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+ JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
+ logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
+ indexes.addAll(responseJson.keySet());
+ }
+ return indexes;
+ }
+
public JsonObject getIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest();
@@ -140,9 +165,9 @@ public class ElasticSearchClient implements Client {
Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode == 200) {
+ if (statusCode == HttpStatus.SC_OK) {
return true;
- } else if (statusCode == 404) {
+ } else if (statusCode == HttpStatus.SC_NOT_FOUND) {
return false;
} else {
throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
@@ -153,24 +178,28 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName);
JsonArray patterns = new JsonArray();
- patterns.add(indexName + "_*");
+ patterns.add(indexName + "-*");
+
+ JsonObject aliases = new JsonObject();
+ aliases.add(indexName, new JsonObject());
JsonObject template = new JsonObject();
template.add("index_patterns", patterns);
+ template.add("aliases", aliases);
template.add("settings", settings);
template.add("mappings", mapping);
HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
- return response.getStatusLine().getStatusCode() == 200;
+ return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
- return response.getStatusLine().getStatusCode() == 200;
+ return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index 4e7d483..a899133 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -153,12 +153,12 @@ public class ITElasticSearchClient {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("name", "pengys")
.endObject();
- client.forceInsert(indexName + "_2019", "testid", builder);
+ client.forceInsert(indexName + "-2019", "testid", builder);
- JsonObject index = client.getIndex(indexName + "_2019");
+ JsonObject index = client.getIndex(indexName + "-2019");
logger.info(index.toString());
- Assert.assertEquals(1, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
- Assert.assertEquals(0, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
+ Assert.assertEquals(1, index.getAsJsonObject(indexName + "-2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
+ Assert.assertEquals(0, index.getAsJsonObject(indexName + "-2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
client.deleteTemplate(indexName);
Assert.assertFalse(client.isExistsTemplate(indexName));
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 708b1a6..97d12b1 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -52,6 +52,7 @@ core:
- Day
- Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
+ enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
@@ -65,6 +66,10 @@ storage:
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+# # Those data TTL settings will override the same settings in core module.
+# recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+# otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+# monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index b70ef71..bac8a84 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -52,10 +52,11 @@ core:
- Day
- Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
+ enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
- dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
+ otherMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
elasticsearch:
@@ -65,6 +66,10 @@ storage:
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+ # Those data TTL settings will override the same settings in core module.
+ recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
+ otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
+ monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index dffce36..c159698 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -18,159 +18,39 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
+@Getter
public class StorageModuleElasticsearchConfig extends ModuleConfig {
-
- @Setter @Getter private String nameSpace;
- @Setter @Getter private String clusterNodes;
- private int indexShardsNumber;
- private int indexReplicasNumber;
- private boolean highPerformanceMode;
- private int traceDataTTL = 90;
- private int minuteMetricDataTTL = 90;
- private int hourMetricDataTTL = 36;
- private int dayMetricDataTTL = 45;
- private int monthMetricDataTTL = 18;
- private int bulkActions = 2000;
- private int bulkSize = 20;
- private int flushInterval = 10;
- private int concurrentRequests = 2;
- private String user;
- private String password;
- private int metadataQueryMaxSize = 5000;
- private int segmentQueryMaxSize = 200;
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public int getIndexShardsNumber() {
- return indexShardsNumber;
- }
-
- void setIndexShardsNumber(int indexShardsNumber) {
- this.indexShardsNumber = indexShardsNumber;
- }
-
- public int getIndexReplicasNumber() {
- return indexReplicasNumber;
- }
-
- void setIndexReplicasNumber(int indexReplicasNumber) {
- this.indexReplicasNumber = indexReplicasNumber;
- }
-
- boolean isHighPerformanceMode() {
- return highPerformanceMode;
- }
-
- void setHighPerformanceMode(boolean highPerformanceMode) {
- this.highPerformanceMode = highPerformanceMode;
- }
-
- public int getTraceDataTTL() {
- return traceDataTTL;
- }
-
- void setTraceDataTTL(int traceDataTTL) {
- this.traceDataTTL = traceDataTTL == 0 ? 90 : traceDataTTL;
- }
-
- public int getMinuteMetricDataTTL() {
- return minuteMetricDataTTL;
- }
-
- void setMinuteMetricDataTTL(int minuteMetricDataTTL) {
- this.minuteMetricDataTTL = minuteMetricDataTTL == 0 ? 90 : minuteMetricDataTTL;
- }
-
- public int getHourMetricDataTTL() {
- return hourMetricDataTTL;
- }
-
- void setHourMetricDataTTL(int hourMetricDataTTL) {
- this.hourMetricDataTTL = hourMetricDataTTL == 0 ? 36 : hourMetricDataTTL;
- }
-
- public int getDayMetricDataTTL() {
- return dayMetricDataTTL;
- }
-
- void setDayMetricDataTTL(int dayMetricDataTTL) {
- this.dayMetricDataTTL = dayMetricDataTTL == 0 ? 45 : dayMetricDataTTL;
- }
-
- public int getMonthMetricDataTTL() {
- return monthMetricDataTTL;
- }
-
- void setMonthMetricDataTTL(int monthMetricDataTTL) {
- this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL;
- }
-
- public int getBulkActions() {
- return bulkActions;
- }
-
- public void setBulkActions(int bulkActions) {
- this.bulkActions = bulkActions == 0 ? 2000 : bulkActions;
- }
-
- public int getBulkSize() {
- return bulkSize;
- }
-
- public void setBulkSize(int bulkSize) {
- this.bulkSize = bulkSize == 0 ? 20 : bulkSize;
- }
-
- public int getFlushInterval() {
- return flushInterval;
- }
-
- public void setFlushInterval(int flushInterval) {
- this.flushInterval = flushInterval == 0 ? 10 : flushInterval;
- }
-
- public int getConcurrentRequests() {
- return concurrentRequests;
- }
-
- public void setConcurrentRequests(int concurrentRequests) {
- this.concurrentRequests = concurrentRequests == 0 ? 2 : concurrentRequests;
- }
-
- public int getMetadataQueryMaxSize() {
- return metadataQueryMaxSize;
- }
-
- public void setMetadataQueryMaxSize(int metadataQueryMaxSize) {
- this.metadataQueryMaxSize = metadataQueryMaxSize;
- }
-
- public int getSegmentQueryMaxSize() {
- return segmentQueryMaxSize;
- }
-
- public void setSegmentQueryMaxSize(int segmentQueryMaxSize) {
- this.segmentQueryMaxSize = segmentQueryMaxSize;
+ @Setter private String nameSpace;
+ @Setter private String clusterNodes;
+ @Setter private int indexShardsNumber;
+ @Setter private int indexReplicasNumber;
+ @Setter private boolean highPerformanceMode;
+ @Setter private int bulkActions = 2000;
+ @Setter private int bulkSize = 20;
+ @Setter private int flushInterval = 10;
+ @Setter private int concurrentRequests = 2;
+ @Setter private String user;
+ @Setter private String password;
+ @Setter private int metadataQueryMaxSize = 5000;
+ @Setter private int segmentQueryMaxSize = 200;
+ @Setter private int recordDataTTL = 7;
+ @Setter private int minuteMetricsDataTTL = 2;
+ @Setter private int hourMetricsDataTTL = 2;
+ @Setter private int dayMetricsDataTTL = 2;
+ private int otherMetricsDataTTL = 0;
+ @Setter private int monthMetricsDataTTL = 18;
+
+ public void setOtherMetricsDataTTL(int otherMetricsDataTTL) {
+ if (otherMetricsDataTTL > 0) {
+ minuteMetricsDataTTL = otherMetricsDataTTL;
+ hourMetricsDataTTL = otherMetricsDataTTL;
+ dayMetricsDataTTL = otherMetricsDataTTL;
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 5382955..6b90910 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -21,58 +21,23 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.IOException;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInventoryCacheEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockDAOImpl;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AlarmQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.LogQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl.ElasticsearchStorageTTL;
/**
* @author peng-yongsheng
*/
public class StorageModuleElasticsearchProvider extends ModuleProvider {
- private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
-
protected final StorageModuleElasticsearchConfig config;
protected ElasticSearchClient elasticSearchClient;
@@ -106,7 +71,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
@@ -125,6 +90,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public void start() throws ModuleStartException {
+ overrideCoreModuleTTLConfig();
+
try {
elasticSearchClient.connect();
@@ -146,4 +113,13 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
+
+ private void overrideCoreModuleTTLConfig() {
+ ConfigService configService = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
+ configService.getDataTTLConfig().setRecordDataTTL(config.getRecordDataTTL());
+ configService.getDataTTLConfig().setMinuteMetricsDataTTL(config.getMinuteMetricsDataTTL());
+ configService.getDataTTLConfig().setHourMetricsDataTTL(config.getHourMetricsDataTTL());
+ configService.getDataTTLConfig().setDayMetricsDataTTL(config.getDayMetricsDataTTL());
+ configService.getDataTTLConfig().setMonthMetricsDataTTL(config.getMonthMetricsDataTTL());
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 44c6fdd..b2a273f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -19,8 +19,15 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.joda.time.DateTime;
import org.slf4j.*;
/**
@@ -30,16 +37,45 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
private static final Logger logger = LoggerFactory.getLogger(HistoryDeleteEsDAO.class);
- public HistoryDeleteEsDAO(ElasticSearchClient client) {
+ private final StorageTTL storageTTL;
+ private final ModuleDefineHolder moduleDefineHolder;
+
+ public HistoryDeleteEsDAO(ModuleDefineHolder moduleDefineHolder, ElasticSearchClient client, StorageTTL storageTTL) {
super(client);
+ this.moduleDefineHolder = moduleDefineHolder;
+ this.storageTTL = storageTTL;
}
@Override
- public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
+ public void deleteHistory(Model model, String timeBucketColumnName) throws IOException {
+ ConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class);
+
ElasticSearchClient client = getClient();
- int statusCode = client.delete(modelName, timeBucketColumnName, timeBucketBefore);
- if (logger.isDebugEnabled()) {
- logger.debug("Delete history from {} index, status code {}", client.formatIndexName(modelName), statusCode);
+ long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig());
+
+ if (model.isCapableOfTimeSeries()) {
+ List<String> indexes = client.retrievalIndexByAliases(model.getName());
+
+ List<String> prepareDeleteIndexes = new ArrayList<>();
+ for (String index : indexes) {
+ long timeSeries = TimeSeriesUtils.indexTimeSeries(index);
+ if (timeBefore >= timeSeries) {
+ prepareDeleteIndexes.add(index);
+ }
+ }
+
+ if (indexes.size() == prepareDeleteIndexes.size()) {
+ client.createIndex(TimeSeriesUtils.timeSeries(model));
+ }
+
+ for (String prepareDeleteIndex : prepareDeleteIndexes) {
+ client.deleteIndex(prepareDeleteIndex);
+ }
+ } else {
+ int statusCode = client.delete(model.getName(), timeBucketColumnName, timeBefore);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Delete history from {} index, status code {}", client.formatIndexName(model.getName()), statusCode);
+ }
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 7693d57..58a42c7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -39,8 +40,8 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO<IndexRequest, Upd
this.storageBuilder = storageBuilder;
}
- @Override public Metrics get(String modelName, Metrics metrics) throws IOException {
- GetResponse response = getClient().get(modelName, metrics.id());
+ @Override public Metrics get(Model model, Metrics metrics) throws IOException {
+ GetResponse response = getClient().get(model.getName(), metrics.id());
if (response.isExists()) {
return storageBuilder.map2Data(response.getSource());
} else {
@@ -48,13 +49,15 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO<IndexRequest, Upd
}
}
- @Override public IndexRequest prepareBatchInsert(String modelName, Metrics metrics) throws IOException {
+ @Override public IndexRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(metrics));
+ String modelName = TimeSeriesUtils.timeSeries(model, metrics.getTimeBucket());
return getClient().prepareInsert(modelName, metrics.id(), builder);
}
- @Override public UpdateRequest prepareBatchUpdate(String modelName, Metrics metrics) throws IOException {
+ @Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(metrics));
+ String modelName = TimeSeriesUtils.timeSeries(model, metrics.getTimeBucket());
return getClient().prepareUpdate(modelName, metrics.id(), builder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index f2fa26e..05bb6d2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -37,8 +38,9 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
this.storageBuilder = storageBuilder;
}
- @Override public IndexRequest prepareBatchInsert(String modelName, Record record) throws IOException {
+ @Override public IndexRequest prepareBatchInsert(Model model, Record record) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(record));
+ String modelName = TimeSeriesUtils.timeSeries(model, record.getTimeBucket());
return getClient().prepareInsert(modelName, record.id(), builder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 47a7ad4..7e9f006 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -45,49 +45,53 @@ public class StorageEsInstaller extends ModelInstaller {
this.columnTypeEsMapping = new ColumnTypeEsMapping();
}
- @Override protected boolean isExists(Client client, Model tableDefine) throws StorageException {
+ @Override protected boolean isExists(Client client, Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
- return esClient.isExistsIndex(tableDefine.getName());
- } catch (IOException e) {
- throw new StorageException(e.getMessage());
- }
- }
-
- @Override protected void columnCheck(Client client, Model tableDefine) {
-
- }
-
- @Override protected void deleteTable(Client client, Model tableDefine) throws StorageException {
- ElasticSearchClient esClient = (ElasticSearchClient)client;
-
- try {
- if (!esClient.deleteIndex(tableDefine.getName())) {
- throw new StorageException(tableDefine.getName() + " index delete failure.");
+ if (model.isCapableOfTimeSeries()) {
+ return esClient.isExistsTemplate(model.getName()) && esClient.isExistsIndex(model.getName());
+ } else {
+ return esClient.isExistsIndex(model.getName());
}
} catch (IOException e) {
- throw new StorageException(tableDefine.getName() + " index delete failure.");
+ throw new StorageException(e.getMessage());
}
}
- @Override protected void createTable(Client client, Model tableDefine) throws StorageException {
+ @Override protected void createTable(Client client, Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
JsonObject settings = createSetting();
- JsonObject mapping = createMapping(tableDefine);
- logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), mapping.toString());
+ JsonObject mapping = createMapping(model);
+ logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString());
- boolean isAcknowledged;
try {
- isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mapping);
+ if (model.isCapableOfTimeSeries()) {
+ if (!esClient.isExistsTemplate(model.getName())) {
+ boolean isAcknowledged = esClient.createTemplate(model.getName(), settings, mapping);
+ logger.info("create {} index template finished, isAcknowledged: {}", model.getName(), isAcknowledged);
+ if (!isAcknowledged) {
+ throw new StorageException("create " + model.getName() + " index template failure, ");
+ }
+ }
+ if (!esClient.isExistsIndex(model.getName())) {
+ String timeSeriesIndexName = TimeSeriesUtils.timeSeries(model);
+ boolean isAcknowledged = esClient.createIndex(timeSeriesIndexName);
+ logger.info("create {} index finished, isAcknowledged: {}", timeSeriesIndexName, isAcknowledged);
+ if (!isAcknowledged) {
+ throw new StorageException("create " + timeSeriesIndexName + " time series index failure, ");
+ }
+ }
+ } else {
+ boolean isAcknowledged = esClient.createIndex(model.getName(), settings, mapping);
+ logger.info("create {} index finished, isAcknowledged: {}", model.getName(), isAcknowledged);
+ if (!isAcknowledged) {
+ throw new StorageException("create " + model.getName() + " index failure, ");
+ }
+ }
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
- logger.info("create {} index finished, isAcknowledged: {}", esClient.formatIndexName(tableDefine.getName()), isAcknowledged);
-
- if (!isAcknowledged) {
- throw new StorageException("create " + esClient.formatIndexName(tableDefine.getName()) + " index failure, ");
- }
}
private JsonObject createSetting() {
@@ -99,7 +103,7 @@ public class StorageEsInstaller extends ModelInstaller {
return setting;
}
- private JsonObject createMapping(Model tableDefine) {
+ private JsonObject createMapping(Model model) {
JsonObject mapping = new JsonObject();
mapping.add(ElasticSearchClient.TYPE, new JsonObject());
@@ -108,7 +112,7 @@ public class StorageEsInstaller extends ModelInstaller {
JsonObject properties = new JsonObject();
type.add("properties", properties);
- for (ModelColumn columnDefine : tableDefine.getColumns()) {
+ for (ModelColumn columnDefine : model.getColumns()) {
if (columnDefine.isMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
@@ -128,7 +132,7 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
- logger.debug("create elasticsearch index: {}", mapping.toString());
+ logger.debug("elasticsearch index template setting: {}", mapping.toString());
return mapping;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
new file mode 100644
index 0000000..97c5221
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+
+/**
+ * @author peng-yongsheng
+ */
+class TimeSeriesUtils {
+
+ static String timeSeries(Model model) {
+ long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
+ return timeSeries(model, timeBucket);
+ }
+
+ static String timeSeries(Model model, long timeBucket) {
+ if (!model.isCapableOfTimeSeries()) {
+ return model.getName();
+ }
+
+ switch (model.getDownsampling()) {
+ case None:
+ return model.getName();
+ case Hour:
+ return model.getName() + Const.LINE + timeBucket / 100;
+ case Minute:
+ return model.getName() + Const.LINE + timeBucket / 10000;
+ case Second:
+ return model.getName() + Const.LINE + timeBucket / 1000000;
+ default:
+ return model.getName() + Const.LINE + timeBucket;
+ }
+ }
+
+ static long indexTimeSeries(String indexName) {
+ return Long.valueOf(indexName.substring(indexName.lastIndexOf(Const.LINE) + 1));
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
similarity index 59%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
index 1ee9205..5779de4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelName.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
@@ -15,30 +15,26 @@
* limitations under the License.
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl;
-import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.ttl.*;
/**
* @author peng-yongsheng
*/
-public class ModelName {
+public class ElasticsearchStorageTTL implements StorageTTL {
- public static String build(Downsampling downsampling, String modelName) {
+ @Override public TTLCalculator calculator(Downsampling downsampling) {
switch (downsampling) {
case Month:
- return modelName + Const.ID_SPLIT + Downsampling.Month.getName();
- case Day:
- return modelName + Const.ID_SPLIT + Downsampling.Day.getName();
+ return new MonthTTLCalculator();
case Hour:
- return modelName + Const.ID_SPLIT + Downsampling.Hour.getName();
+ return new EsHourTTLCalculator();
case Minute:
- return modelName + Const.ID_SPLIT + Downsampling.Minute.getName();
- case Second:
- return modelName + Const.ID_SPLIT + Downsampling.Second.getName();
+ return new EsMinuteTTLCalculator();
default:
- return modelName;
+ return new DayTTLCalculator();
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsHourTTLCalculator.java
similarity index 69%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsHourTTLCalculator.java
index d14ad77..a36a16e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsHourTTLCalculator.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package org.apache.skywalking.oap.server.core.storage.ttl;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
+import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public class DayTTLCalculator implements TTLCalculator {
+public class EsHourTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusDays(0 - dataTTLConfig.getHourMetricsDataTTL()).toString("yyyyMMdd"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsMinuteTTLCalculator.java
similarity index 69%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsMinuteTTLCalculator.java
index d14ad77..825b8ce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsMinuteTTLCalculator.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package org.apache.skywalking.oap.server.core.storage.ttl;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
+import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public class DayTTLCalculator implements TTLCalculator {
+public class EsMinuteTTLCalculator implements TTLCalculator {
- @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
- return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
+ return Long.valueOf(currentTime.plusDays(0 - dataTTLConfig.getMinuteMetricsDataTTL()).toString("yyyyMMdd"));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTestCase.java
similarity index 74%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTestCase.java
index 382d7dd..a4bec73 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTestCase.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
-import java.io.IOException;
+import org.junit.*;
/**
* @author peng-yongsheng
*/
-public interface IHistoryDeleteDAO extends DAO {
+public class TimeSeriesUtilsTestCase {
- void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException;
+ @Test
+ public void indexTimeSeries() {
+ Assert.assertEquals(20190602, TimeSeriesUtils.indexTimeSeries("Index_Test-20190602"));
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index e2135f5..26bf2c6 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -20,50 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.core.storage.ttl.GeneralStorageTTL;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2LogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
/**
* H2 Storage provider is for demonstration and preview only. I will find that haven't implemented several interfaces,
@@ -122,7 +86,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(getManager(), h2Client, new GeneralStorageTTL()));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
index 5c0d48e..69fcd40 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
@@ -19,29 +19,42 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
+import java.sql.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
+import org.joda.time.DateTime;
/**
* @author wusheng
*/
public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
- private JDBCHikariCPClient client;
- public H2HistoryDeleteDAO(JDBCHikariCPClient client) {
+ private final JDBCHikariCPClient client;
+ private final StorageTTL storageTTL;
+ private final ModuleDefineHolder moduleDefineHolder;
+
+ public H2HistoryDeleteDAO(ModuleDefineHolder moduleDefineHolder, JDBCHikariCPClient client, StorageTTL storageTTL) {
this.client = client;
+ this.storageTTL = storageTTL;
+ this.moduleDefineHolder = moduleDefineHolder;
}
@Override
- public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
- SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + modelName + " where ").append(timeBucketColumnName).append("<= ?");
+ public void deleteHistory(Model model, String timeBucketColumnName) throws IOException {
+ ConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class);
+
+ SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + model.getName() + " where ").append(timeBucketColumnName).append("<= ?");
try (Connection connection = client.getConnection()) {
- client.execute(connection, dataDeleteSQL.toString(), timeBucketBefore);
+ long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig());
+ client.execute(connection, dataDeleteSQL.toString(), timeBefore);
} catch (JDBCClientException | SQLException e) {
throw new IOException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
index 4120017..70318cc 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
@@ -37,15 +37,15 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecut
this.storageBuilder = storageBuilder;
}
- @Override public Metrics get(String modelName, Metrics metrics) throws IOException {
- return (Metrics)getByID(h2Client, modelName, metrics.id(), storageBuilder);
+ @Override public Metrics get(Model model, Metrics metrics) throws IOException {
+ return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
}
- @Override public SQLExecutor prepareBatchInsert(String modelName, Metrics metrics) throws IOException {
- return getInsertExecutor(modelName, metrics, storageBuilder);
+ @Override public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+ return getInsertExecutor(model.getName(), metrics, storageBuilder);
}
- @Override public SQLExecutor prepareBatchUpdate(String modelName, Metrics metrics) throws IOException {
- return getUpdateExecutor(modelName, metrics, storageBuilder);
+ @Override public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+ return getUpdateExecutor(model.getName(), metrics, storageBuilder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
index f9a2a43..aa4c375 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
* @author wusheng
*/
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO<SQLExecutor> {
+
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
@@ -37,7 +38,7 @@ public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO<SQLExecutor
this.storageBuilder = storageBuilder;
}
- @Override public SQLExecutor prepareBatchInsert(String modelName, Record record) throws IOException {
- return getInsertExecutor(modelName, record, storageBuilder);
+ @Override public SQLExecutor prepareBatchInsert(Model model, Record record) throws IOException {
+ return getInsertExecutor(model.getName(), record, storageBuilder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
index e2336f0..248d493 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
@@ -18,23 +18,16 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.sql.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
-import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
+import org.slf4j.*;
public class H2TableInstaller extends ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(H2TableInstaller.class);
@@ -60,14 +53,6 @@ public class H2TableInstaller extends ModelInstaller {
return false;
}
- @Override protected void columnCheck(Client client, Model model) throws StorageException {
-
- }
-
- @Override protected void deleteTable(Client client, Model model) throws StorageException {
-
- }
-
@Override protected void createTable(Client client, Model model) throws StorageException {
JDBCHikariCPClient h2Client = (JDBCHikariCPClient)client;
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " (");
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index f780f4c..e5d5c7a 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -21,48 +21,16 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.io.IOException;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
+import org.apache.skywalking.oap.server.core.storage.ttl.GeneralStorageTTL;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockInstaller;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.*;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
/**
* MySQL storage provider should be secondary choice for production usage as SkyWalking storage solution. It enhanced
@@ -124,7 +92,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(getManager(), mysqlClient, new GeneralStorageTTL()));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 307b3da..d585431 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -18,23 +18,20 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
-import java.sql.Connection;
-import java.sql.SQLException;
-import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
+import java.sql.*;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
@@ -44,11 +41,12 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*;
* @author wusheng
*/
public class MySQLTableInstaller extends H2TableInstaller {
+
private static final Logger logger = LoggerFactory.getLogger(MySQLTableInstaller.class);
public MySQLTableInstaller(ModuleManager moduleManager) {
super(moduleManager);
- /**
+ /*
* Override column because the default column names in core have syntax conflict with MySQL.
*/
this.overrideColumnName("precision", "cal_precision");
@@ -61,15 +59,6 @@ public class MySQLTableInstaller extends H2TableInstaller {
this.createIndexes(jdbcHikariCPClient, model);
}
- @Override protected void deleteTable(Client client, Model model) throws StorageException {
- JDBCHikariCPClient jdbcClient = (JDBCHikariCPClient)client;
- try (Connection connection = jdbcClient.getConnection()) {
- jdbcClient.execute(connection, "drop table " + model.getName());
- } catch (SQLException | JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- }
- }
-
@Override
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
@@ -109,7 +98,6 @@ public class MySQLTableInstaller extends H2TableInstaller {
return;
default:
createIndexesForAllMetrics(client, model);
-
}
}
@@ -119,9 +107,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(SegmentRecord.TIME_BUCKET).append(")");
createIndex(client, connection, model, tableIndexSQL);
- } catch (JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- } catch (SQLException e) {
+ } catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
@@ -132,9 +118,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(SegmentRecord.TIME_BUCKET).append(")");
createIndex(client, connection, model, tableIndexSQL);
- } catch (JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- } catch (SQLException e) {
+ } catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
@@ -160,9 +144,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(SegmentRecord.TIME_BUCKET).append(")");
createIndex(client, connection, model, tableIndexSQL);
- } catch (JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- } catch (SQLException e) {
+ } catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
@@ -178,9 +160,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(RegisterSource.HEARTBEAT_TIME).append(", ").append(RegisterSource.REGISTER_TIME).append(")");
createIndex(client, connection, model, tableIndexSQL);
- } catch (JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- } catch (SQLException e) {
+ } catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}