You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/05/16 16:33:38 UTC

[skywalking] branch master updated: Manage models in one place. (#2695)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb5def  Manage models in one place. (#2695)
4eb5def is described below

commit 4eb5def126fcd0c0d0f04a47201341a25668c1a4
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Fri May 17 00:33:29 2019 +0800

    Manage models in one place. (#2695)
---
 .../oap/server/core/CoreModuleProvider.java        |  2 +-
 .../core/analysis/StreamAnnotationListener.java    |  7 +-
 .../analysis/worker/MetricsStreamProcessor.java    | 28 ++++++--
 .../analysis/worker/RecordStreamProcessor.java     |  9 ++-
 .../core/analysis/worker/TopNStreamProcessor.java  |  9 ++-
 .../register/worker/InventoryStreamProcessor.java  |  8 ++-
 .../server/core/storage/model/IModelSetter.java    |  6 +-
 .../oap/server/core/storage/model/Model.java       | 42 +++++++++---
 .../server/core/storage/model/ModelInstaller.java  | 27 ++------
 .../{annotation => model}/StorageModels.java       | 24 +++++--
 .../core/storage/ttl/DataTTLKeeperTimer.java       | 75 ++++------------------
 .../DayTTLCalculator.java}                         | 14 ++--
 .../HourTTLCalculator.java}                        | 14 ++--
 .../MinuteTTLCalculator.java}                      | 14 ++--
 .../MonthTTLCalculator.java}                       | 14 ++--
 .../SecondTTLCalculator.java}                      | 14 ++--
 .../IModelSetter.java => ttl/TTLCalculator.java}   | 12 ++--
 .../plugin/jdbc/mysql/MySQLTableInstaller.java     |  4 +-
 18 files changed, 168 insertions(+), 155 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 79879b9..9b443d6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHan
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.annotation.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
 import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
 import org.apache.skywalking.oap.server.core.worker.*;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
index 4a703fc..df52261 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
@@ -19,11 +19,10 @@
 package org.apache.skywalking.oap.server.core.analysis;
 
 import java.lang.annotation.Annotation;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.worker.*;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
 import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -48,16 +47,12 @@ public class StreamAnnotationListener implements AnnotationListener {
 
             if (stream.processor().equals(InventoryStreamProcessor.class)) {
                 InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
-                moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
             } else if (stream.processor().equals(RecordStreamProcessor.class)) {
                 RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
-                moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
             } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                 MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
-                moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, true, stream.name(), stream.scopeId(), stream.storage());
             } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                 TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
-                moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
             } else {
                 throw new UnexpectedException("Unknown stream processor.");
             }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index df224d3..b88c081 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -23,7 +23,9 @@ import lombok.Getter;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
 import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -60,10 +62,28 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
             throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e);
         }
 
-        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, stream.name());
-        MetricsPersistentWorker hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Hour.getName());
-        MetricsPersistentWorker dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Day.getName());
-        MetricsPersistentWorker monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Month.getName());
+        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+        DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
+
+        MetricsPersistentWorker hourPersistentWorker = null;
+        MetricsPersistentWorker dayPersistentWorker = null;
+        MetricsPersistentWorker monthPersistentWorker = null;
+
+        if (configService.shouldToHour()) {
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour);
+            hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+        }
+        if (configService.shouldToDay()) {
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day);
+            dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+        }
+        if (configService.shouldToMonth()) {
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month);
+            monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model.getName());
+        }
+
+        Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute);
+        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model.getName());
 
         MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
         MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 3dfb2fb..ff2a636 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -20,10 +20,11 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -48,6 +49,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
 
     @Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
 
+    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
         if (DisableRegister.INSTANCE.include(stream.name())) {
             return;
@@ -61,7 +63,10 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
             throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e);
         }
 
-        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, stream.name(), 1000, recordDAO);
+        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+        Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Second);
+        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model.getName(), 1000, recordDAO);
+
         persistentWorkers.add(persistentWorker);
         workers.put(recordClass, persistentWorker);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 670ef93..4ba101d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -44,6 +45,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
         return PROCESSOR;
     }
 
+    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
         if (DisableRegister.INSTANCE.include(stream.name())) {
             return;
@@ -57,7 +59,10 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
             throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e);
         }
 
-        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, stream.name(), 50, recordDAO);
+        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+        Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage());
+
+        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model.getName(), 50, recordDAO);
         persistentWorkers.add(persistentWorker);
         workers.put(topNClass, persistentWorker);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 1ce4c7f..3111902 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -19,10 +19,11 @@
 package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -42,6 +43,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
         entryWorkers.get(registerSource.getClass()).in(registerSource);
     }
 
+    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends RegisterSource> inventoryClass) {
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IRegisterDAO registerDAO;
@@ -51,7 +53,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
             throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e);
         }
 
-        RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, stream.name(), registerDAO, stream.scopeId());
+        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
+        Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage());
+        RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
 
         RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
index 9639f11..f81d1b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.storage.model;
 
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.library.module.Service;
 
@@ -25,5 +26,8 @@ import org.apache.skywalking.oap.server.library.module.Service;
  * @author peng-yongsheng
  */
 public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+
+    Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage);
+
+    Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 43ce38d..a48c48c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.storage.model;
 
 import java.util.List;
 import lombok.Getter;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.ttl.*;
 
 /**
  * @author peng-yongsheng
@@ -27,21 +30,40 @@ import lombok.Getter;
 @Getter
 public class Model {
     private final String name;
-    private final boolean isMetrics;
     private final boolean deleteHistory;
     private final List<ModelColumn> columns;
-    private final int sourceScopeId;
+    private final int scopeId;
+    private final TTLCalculator ttlCalculator;
 
-    public Model(String name, List<ModelColumn> columns, boolean isMetrics, boolean deleteHistory,
-        int sourceScopeId) {
-        this.name = name;
+    public Model(String name, List<ModelColumn> columns, boolean deleteHistory,
+        int scopeId, Downsampling downsampling) {
         this.columns = columns;
-        this.isMetrics = isMetrics;
         this.deleteHistory = deleteHistory;
-        this.sourceScopeId = sourceScopeId;
-    }
+        this.scopeId = scopeId;
 
-    public Model copy(String name) {
-        return new Model(name, columns, isMetrics, deleteHistory, sourceScopeId);
+        switch (downsampling) {
+            case Minute:
+                this.name = name;
+                this.ttlCalculator = new MinuteTTLCalculator();
+                break;
+            case Hour:
+                this.name = name + Const.ID_SPLIT + Downsampling.Hour.getName();
+                this.ttlCalculator = new HourTTLCalculator();
+                break;
+            case Day:
+                this.name = name + Const.ID_SPLIT + Downsampling.Day.getName();
+                this.ttlCalculator = new DayTTLCalculator();
+                break;
+            case Month:
+                this.name = name + Const.ID_SPLIT + Downsampling.Month.getName();
+                this.ttlCalculator = new MonthTTLCalculator();
+                break;
+            case Second:
+                this.name = name;
+                this.ttlCalculator = new SecondTTLCalculator();
+                break;
+            default:
+                throw new UnexpectedException("Unexpected downsampling setting.");
+        }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index e50e66e..8efdf5b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -18,10 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.storage.model;
 
-import java.util.*;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
@@ -41,39 +40,23 @@ public abstract class ModelInstaller {
 
     public final void install(Client client) throws StorageException {
         IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
-        DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
 
         List<Model> models = modelGetter.getModels();
-        List<Model> allModels = new ArrayList<>();
-        models.forEach(model -> {
-            if (model.isMetrics()) {
-                if (downsamplingConfigService.shouldToHour()) {
-                    allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName()));
-                }
-                if (downsamplingConfigService.shouldToDay()) {
-                    allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Day.getName()));
-                }
-                if (downsamplingConfigService.shouldToMonth()) {
-                    allModels.add(model.copy(model.getName() + Const.ID_SPLIT + Downsampling.Month.getName()));
-                }
-            }
-        });
-        allModels.addAll(models);
-
         boolean debug = System.getProperty("debug") != null;
 
         if (RunningMode.isNoInitMode()) {
-            for (Model model : allModels) {
+            for (Model model : models) {
                 while (!isExists(client, model)) {
                     try {
                         logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName());
                         Thread.sleep(3000L);
                     } catch (InterruptedException e) {
+                        logger.error(e.getMessage());
                     }
                 }
             }
         } else {
-            for (Model model : allModels) {
+            for (Model model : models) {
                 if (!isExists(client, model)) {
                     logger.info("table: {} does not exist", model.getName());
                     createTable(client, model);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
similarity index 78%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 7f3cfe4..89f8057 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -13,16 +13,16 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.annotation;
+package org.apache.skywalking.oap.server.core.storage.model;
 
 import java.lang.reflect.Field;
 import java.util.*;
 import lombok.Getter;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
 import org.slf4j.*;
 
 /**
@@ -38,13 +38,27 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
         this.models = new LinkedList<>();
     }
 
-    @Override public void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage) {
+    @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) {
+        return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.Minute);
+    }
+
+    @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) {
         // Check this scope id is valid.
         DefaultScopeDefine.nameOf(scopeId);
+
+        for (Model model : models) {
+            if (model.getName().equals(modelName)) {
+                return model;
+            }
+        }
+
         List<ModelColumn> modelColumns = new LinkedList<>();
         retrieval(aClass, modelName, modelColumns);
 
-        models.add(new Model(modelName, modelColumns, isMetrics, storage.deleteHistory(), scopeId));
+        Model model = new Model(modelName, modelColumns, storage.deleteHistory(), scopeId, downsampling);
+        models.add(model);
+
+        return model;
     }
 
     private void retrieval(Class clazz, String modelName, List<ModelColumn> modelColumns) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index ca0eca6..794a9cc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,29 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import lombok.Setter;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.DataTTL;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.Downsampling;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -72,63 +61,25 @@ public enum DataTTLKeeperTimer {
             return;
         }
 
-        TimeBuckets timeBuckets = convertTimeBucket(new DateTime());
         logger.info("Beginning to remove expired metrics from the storage.");
-        logger.info("Metrics in minute dimension before {}, are going to be removed.", timeBuckets.minuteTimeBucketBefore);
-        logger.info("Metrics in hour dimension before {}, are going to be removed.", timeBuckets.hourTimeBucketBefore);
-        logger.info("Metrics in day dimension before {}, are going to be removed.", timeBuckets.dayTimeBucketBefore);
-        logger.info("Metrics in month dimension before {}, are going to be removed.", timeBuckets.monthTimeBucketBefore);
+
+        DateTime currentTime = new DateTime();
 
         IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
-        DownsamplingConfigService downsamplingConfigService = moduleManager.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
         List<Model> models = modelGetter.getModels();
         models.forEach(model -> {
-            if (model.isMetrics()) {
-                execute(model, model.getName(), timeBuckets.minuteTimeBucketBefore, Metrics.TIME_BUCKET);
-
-                if (downsamplingConfigService.shouldToHour()) {
-                    execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Hour.getName(), timeBuckets.hourTimeBucketBefore, Metrics.TIME_BUCKET);
-                }
-                if (downsamplingConfigService.shouldToDay()) {
-                    execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Day.getName(), timeBuckets.dayTimeBucketBefore, Metrics.TIME_BUCKET);
-                }
-                if (downsamplingConfigService.shouldToMonth()) {
-                    execute(model, model.getName() + Const.ID_SPLIT + Downsampling.Month.getName(), timeBuckets.monthTimeBucketBefore, Metrics.TIME_BUCKET);
-                }
-            } else {
-                execute(model, model.getName(), timeBuckets.recordDataTTL, Record.TIME_BUCKET);
+            if (model.isDeleteHistory()) {
+                execute(model, model.getTtlCalculator().timeBefore(currentTime, dataTTL));
             }
         });
     }
 
-    TimeBuckets convertTimeBucket(DateTime currentTime) {
-        TimeBuckets timeBuckets = new TimeBuckets();
-
-        timeBuckets.recordDataTTL = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
-        timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
-        timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
-        timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
-        timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
-
-        return timeBuckets;
-    }
-
-    private void execute(Model model, String modelName, long timeBucketBefore, String timeBucketColumnName) {
+    private void execute(Model model, long timeBucketBefore) {
         try {
-            if (model.isDeleteHistory()) {
-                moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(modelName, timeBucketColumnName, timeBucketBefore);
-            }
+            moduleManager.find(StorageModule.NAME).provider().getService(IHistoryDeleteDAO.class).deleteHistory(model.getName(), Metrics.TIME_BUCKET, timeBucketBefore);
         } catch (IOException e) {
-            logger.warn("History of {} delete failure, time bucket {}", modelName, timeBucketBefore);
+            logger.warn("History of {} delete failure, time bucket {}", model.getName(), timeBucketBefore);
             logger.error(e.getMessage(), e);
         }
     }
-
-    class TimeBuckets {
-        private long recordDataTTL;
-        private long minuteTimeBucketBefore;
-        private long hourTimeBucketBefore;
-        private long dayTimeBucketBefore;
-        private long monthTimeBucketBefore;
-    }
 }
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
similarity index 68%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
index 9639f11..d14ad77 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
@@ -13,17 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class DayTTLCalculator implements TTLCalculator {
+
+    @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+        return Long.valueOf(currentTime.plusDays(0 - dataTTL.getDayMetricsDataTTL()).toString("yyyyMMdd"));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
index 9639f11..d041c42 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
@@ -13,17 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class HourTTLCalculator implements TTLCalculator {
+
+    @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+        return Long.valueOf(currentTime.plusHours(0 - dataTTL.getHourMetricsDataTTL()).toString("yyyyMMddHH"));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
index 9639f11..3d69f72 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
@@ -13,17 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class MinuteTTLCalculator implements TTLCalculator {
+
+    @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+        return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getMinuteMetricsDataTTL()).toString("yyyyMMddHHmm"));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
index 9639f11..a834aa8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
@@ -13,17 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class MonthTTLCalculator implements TTLCalculator {
+
+    @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+        return Long.valueOf(currentTime.plusMonths(0 - dataTTL.getMonthMetricsDataTTL()).toString("yyyyMM"));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
index 9639f11..d53cf89 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
@@ -13,17 +13,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public class SecondTTLCalculator implements TTLCalculator {
+
+    @Override public long timeBefore(DateTime currentTime, DataTTL dataTTL) {
+        return Long.valueOf(currentTime.plusMinutes(0 - dataTTL.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
similarity index 70%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
index 9639f11..fb80add 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
@@ -13,17 +13,17 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.oap.server.core.storage.model;
+package org.apache.skywalking.oap.server.core.storage.ttl;
 
-import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.DataTTL;
+import org.joda.time.DateTime;
 
 /**
  * @author peng-yongsheng
  */
-public interface IModelSetter extends Service {
-    void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
+public interface TTLCalculator {
+
+    long timeBefore(DateTime currentTime, DataTTL dataTTL);
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 5848328..307b3da 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -79,7 +79,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
         } else if (Double.class.equals(type) || double.class.equals(type)) {
             return "DOUBLE";
         } else if (String.class.equals(type)) {
-            if (DefaultScopeDefine.SEGMENT == model.getSourceScopeId()) {
+            if (DefaultScopeDefine.SEGMENT == model.getScopeId()) {
                 if (name.getName().equals(SegmentRecord.TRACE_ID) || name.getName().equals(SegmentRecord.SEGMENT_ID))
                     return "VARCHAR(300)";
             }
@@ -94,7 +94,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
     }
 
     protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
-        switch (model.getSourceScopeId()) {
+        switch (model.getScopeId()) {
             case SERVICE_INVENTORY:
             case SERVICE_INSTANCE_INVENTORY:
             case NETWORK_ADDRESS: