You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/05/16 16:33:38 UTC
[skywalking] branch master updated: Manage models in one place.
(#2695)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb5def Manage models in one place. (#2695)
4eb5def is described below
commit 4eb5def126fcd0c0d0f04a47201341a25668c1a4
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Fri May 17 00:33:29 2019 +0800
Manage models in one place. (#2695)
---
.../oap/server/core/CoreModuleProvider.java | 2 +-
.../core/analysis/StreamAnnotationListener.java | 7 +-
.../analysis/worker/MetricsStreamProcessor.java | 28 ++++++--
.../analysis/worker/RecordStreamProcessor.java | 9 ++-
.../core/analysis/worker/TopNStreamProcessor.java | 9 ++-
.../register/worker/InventoryStreamProcessor.java | 8 ++-
.../server/core/storage/model/IModelSetter.java | 6 +-
.../oap/server/core/storage/model/Model.java | 42 +++++++++---
.../server/core/storage/model/ModelInstaller.java | 27 ++------
.../{annotation => model}/StorageModels.java | 24 +++++--
.../core/storage/ttl/DataTTLKeeperTimer.java | 75 ++++------------------
.../DayTTLCalculator.java} | 14 ++--
.../HourTTLCalculator.java} | 14 ++--
.../MinuteTTLCalculator.java} | 14 ++--
.../MonthTTLCalculator.java} | 14 ++--
.../SecondTTLCalculator.java} | 14 ++--
.../IModelSetter.java => ttl/TTLCalculator.java} | 12 ++--
.../plugin/jdbc/mysql/MySQLTableInstaller.java | 4 +-
18 files changed, 168 insertions(+), 155 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 79879b9..9b443d6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHan
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
index 4a703fc..df52261 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
@@ -19,11 +19,10 @@
package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.Annotation;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -48,16 +47,12 @@ public class StreamAnnotationListener implements AnnotationListener {
if (stream.processor().equals(InventoryStreamProcessor.class)) {
InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
- moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
- moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
- moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, true, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
- moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else {
throw new UnexpectedException("Unknown stream processor.");
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index df224d3..b88c081 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -23,7 +23,9 @@ import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -60,10 +62,28 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e);
}
- MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, stream.name());
- MetricsPersistentWorker hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Hour.getName());
- MetricsPersistentWorker dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Day.getName());
- MetricsPersistentWorker monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Month.getName());
+ IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+ DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
+
+ MetricsPersistentWorker hourPersistentWorker = null;
+ MetricsPersistentWorker dayPersistentWorker = null;
+ MetricsPersistentWorker monthPersistentWorker = null;
+
+ if (configService.shouldToHour()) {
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour);
+ hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ }
+ if (configService.shouldToDay()) {
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day);
+ dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ }
+ if (configService.shouldToMonth()) {
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month);
+ monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+ }
+
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute);
+ MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model.getName());
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 3dfb2fb..ff2a636 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -20,10 +20,11 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -48,6 +49,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
@Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
+ @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
@@ -61,7 +63,10 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e);
}
- RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, stream.name(), 1000, recordDAO);
+ IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+ Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Second);
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model.getName(), 1000, recordDAO);
+
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 670ef93..4ba101d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -44,6 +45,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
return PROCESSOR;
}
+ @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
@@ -57,7 +59,10 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e);
}
- TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, stream.name(), 50, recordDAO);
+ IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+ Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage());
+
+ TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model.getName(), 50, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 1ce4c7f..3111902 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -19,10 +19,11 @@
package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -42,6 +43,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
entryWorkers.get(registerSource.getClass()).in(registerSource);
}
+ @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
@@ -51,7 +53,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e);
}
- RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, stream.name(), registerDAO, stream.scopeId());
+ IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+ Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage());
+ RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
index 9639f11..f81d1b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.model;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -25,5 +26,8 @@ import org.apache.skywalking.oap.server.library.module.Service;
* @author peng-yongsheng
*/
public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+
+ Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage);
+
+ Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 43ce38d..a48c48c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
import lombok.Getter;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.ttl.*;
/**
* @author peng-yongsheng
@@ -27,21 +30,40 @@ import lombok.Getter;
@Getter
public class Model {
private final String name;
- private final boolean isMetrics;
private final boolean deleteHistory;
private final List<ModelColumn> columns;
- private final int sourceScopeId;
+ private final int scopeId;
+ private final TTLCalculator ttlCalculator;
- public Model(String name, List<ModelColumn> columns, boolean isMetrics, boolean deleteHistory,
- int sourceScopeId) {
- this.name = name;
+ public Model(String name, List<ModelColumn> columns, boolean deleteHistory,
+ int scopeId, Downsampling downsampling) {
this.columns = columns;
- this.isMetrics = isMetrics;
this.deleteHistory = deleteHistory;
- this.sourceScopeId = sourceScopeId;
- }
+ this.scopeId = scopeId;
- public Model copy(String name) {
- return new Model(name, columns, isMetrics, deleteHistory, sourceScopeId);
+ switch (downsampling) {
+ case Minute:
+ this.name = name;
+ this.ttlCalculator = new MinuteTTLCalculator();
+ break;
+ case Hour:
+ this.name = name + Const.ID_SPLIT + Downsampling.Hour.getName();
+ this.ttlCalculator = new HourTTLCalculator();
+ break;
+ case Day:
+ this.name = name + Const.ID_SPLIT + Downsampling.Day.getName();
+ this.ttlCalculator = new DayTTLCalculator();
+ break;
+ case Month:
+ this.name = name + Const.ID_SPLIT + Downsampling.Month.getName();
+ this.ttlCalculator = new MonthTTLCalculator();
+ break;
+ case Second:
+ this.name = name;
+ this.ttlCalculator = new SecondTTLCalculator();
+ break;
+ default:
+ throw new UnexpectedException("Unexpected downsampling setting.");
+ }
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index e50e66e..8efdf5b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -18,10 +18,9 @@
package org.apache.skywalking.oap.server.core.storage.model;
-import java.util.*;
+import java.util.List;
import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
@@ -41,39 +40,23 @@ public abstract class ModelInstaller {
public final void install(Client client) throws StorageException {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
- DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
List<Model> models = modelGetter.getModels();
- List<Model> allModels = new ArrayList<>();
- models.forEach(model -> {
- if (model.isMetrics()) {
- if (downsamplingConfigService.shouldToHour()) {
- allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName()));
- }
- if (downsamplingConfigService.shouldToDay()) {
- allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Day.getName()));
- }
- if (downsamplingConfigService.shouldToMonth()) {
- allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Month.getName()));
- }
- }
- });
- allModels.addAll(models);
-
boolean debug = System.getProperty("debug") != null;
if (RunningMode.isNoInitMode()) {
- for (Model model : allModels) {
+ for (Model model : models) {
while (!isExists(client, model)) {
try {
logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName());
Thread.sleep(3000L);
} catch (InterruptedException e) {
+ logger.error(e.getMessage());
}
}
}
} else {
- for (Model model : allModels) {
+ for (Model model : models) {
if (!isExists(client, model)) {
logger.info("table: {} does not exist", model.getName());
createTable(client, model);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
similarity index 78%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 7f3cfe4..89f8057 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -13,16 +13,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.annotation;
+package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.slf4j.*;
/**
@@ -38,13 +38,27 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
this.models = new LinkedList<>();
}
- @Override public void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage) {
+ @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) {
+ return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.Minute);
+ }
+
+ @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
+
+ for (Model model : models) {
+ if (model.getName().equals(modelName)) {
+ return model;
+ }
+ }
+
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, modelName, modelColumns);
- models.add(new Model(modelName, modelColumns, isMetrics, storage.deleteHistory(), scopeId));
+ Model model = new Model(modelName, modelColumns, storage.deleteHistory(), scopeId, downsampling);
+ models.add(model);
+
+ return model;
}
private void retrieval(Class clazz, String modelName, List<ModelColumn> modelColumns) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index ca0eca6..794a9cc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,29 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import lombok.Setter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -72,63 +61,25 @@ public enum DataTTLKeeperTimer {
return;
}
- TimeBuckets timeBuckets = convertTimeBucket(new DateTime());
logger.info("Beginning to remove expired metrics from the storage.");
- logger.info("Metrics in minute dimension before {}, are going to be removed.", timeBuckets.minuteTimeBucketBefore);
- logger.info("Metrics in hour dimension before {}, are going to be removed.", timeBuckets.hourTimeBucketBefore);
- logger.info("Metrics in day dimension before {}, are going to be removed.", timeBuckets.dayTimeBucketBefore);
- logger.info("Metrics in month dimension before {}, are going to be removed.", timeBuckets.monthTimeBucketBefore);
+
+ DateTime currentTime = new DateTime();
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
- DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
List<Model> models = modelGetter.getModels();
models.forEach(model -> {
- if (model.isMetrics()) {
- execute(model, model.getName(), timeBuckets.minuteTimeBucketBefore, Metrics.TIME_BUCKET);
-
- if (downsamplingConfigService.shouldToHour()) {
- execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName(), timeBuckets.hourTimeBucketBefore, Metrics.TIME_BUCKET);
- }
- if (downsamplingConfigService.shouldToDay()) {
- execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Day.getName(), timeBuckets.dayTimeBucketBefore, Metrics.TIME_BUCKET);
- }
- if (downsamplingConfigService.shouldToMonth()) {
- execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Month.getName(), timeBuckets.monthTimeBucketBefore, Metrics.TIME_BUCKET);
- }
- } else {
- execute(model, model.getName(), timeBuckets.recordDataTTL, Record.TIME_BUCKET);
+ if (model.isDeleteHistory()) {
+ execute(model, model.getTtlCalculator().timeBefore(currentTime, dataTTL));
}
});
}
- TimeBuckets convertTimeBucket(DateTime currentTime) {
- TimeBuckets timeBuckets = new TimeBuckets();
-
- timeBuckets.recordDataTTL = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
- timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
- timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
- timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
- timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
-
- return timeBuckets;
- }
-
- private void execute(Model model, String modelName, long timeBucketBefore, String timeBucketColumnName) {
+ private void execute(Model model, long timeBucketBefore) {
try {
- if (model.isDeleteHistory()) {
- moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(modelName, timeBucketColumnName, timeBucketBefore);
- }
+ moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(model.getName(), Metrics.TIME_BUCKET, timeBucketBefore);
} catch (IOException e) {
- logger.warn("History of {} delete failure, time bucket {}", modelName, timeBucketBefore);
+ logger.warn("History of {} delete failure, time bucket {}", model.getName(), timeBucketBefore);
logger.error(e.getMessage(), e);
}
}
-
- class TimeBuckets {
- private long recordDataTTL;
- private long minuteTimeBucketBefore;
- private long hourTimeBucketBefore;
- private long dayTimeBucketBefore;
- private long monthTimeBucketBefore;
- }
}
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
similarity index 68%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
index 9639f11..d14ad77 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class DayTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+ return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
index 9639f11..d041c42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class HourTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+ return Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
index 9639f11..3d69f72 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class MinuteTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+ return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
index 9639f11..a834aa8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class MonthTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+ return Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
index 9639f11..d53cf89 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
@@ -13,17 +13,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class SecondTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+ return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
index 9639f11..fb80add 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
@@ -13,17 +13,17 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
/**
* @author peng-yongsheng
*/
-public interface IModelSetter extends Service {
- void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public interface TTLCalculator {
+
+ long timeBefore(DateTime currentTime, DataTTL dataTTL);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 5848328..307b3da 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -79,7 +79,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "DOUBLE";
} else if (String.class.equals(type)) {
- if (DefaultScopeDefine.SEGMENT == model.getSourceScopeId()) {
+ if (DefaultScopeDefine.SEGMENT == model.getScopeId()) {
if (name.getName().equals(SegmentRecord.TRACE_ID) || name.getName().equals(SegmentRecord.SEGMENT_ID))
return "VARCHAR(300)";
}
@@ -94,7 +94,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
- switch (model.getSourceScopeId()) {
+ switch (model.getScopeId()) {
case SERVICE_INVENTORY:
case SERVICE_INSTANCE_INVENTORY:
case NETWORK_ADDRESS: