You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/02/07 00:44:55 UTC

[skywalking] branch storagebuilder created (now 1a836ca)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch storagebuilder
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 1a836ca  Support multiple implementations of StorageBuilder in different storage implementations - stage 2.

This branch includes the following new commits:

     new 1a836ca  Support multiple implementations of StorageBuilder in different storage implementations - stage 2.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Support multiple implementations of StorageBuilder in different storage implementations - stage 2.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch storagebuilder
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 1a836ca2d4111ee75bc67bb3f87eae25ecf1ec6d
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 7 08:43:17 2021 +0800

    Support multiple implementations of StorageBuilder in different storage implementations - stage 2.
---
 .../provider/meter/process/MeterBaseTest.java      | 124 ---------------------
 .../provider/meter/process/MeterBuilderTest.java   |  92 ---------------
 .../org/apache/skywalking/oal/rt/OALRuntime.java   |  14 ++-
 .../core/analysis/StreamAnnotationListener.java    |   4 +
 .../analysis/worker/ManagementStreamProcessor.java |  15 ++-
 .../analysis/worker/MetricsStreamProcessor.java    |  17 +--
 .../core/analysis/worker/NoneStreamProcessor.java  |  17 +--
 .../analysis/worker/RecordStreamProcessor.java     |  17 +--
 .../core/analysis/worker/TopNStreamProcessor.java  |  21 ++--
 .../oap/server/core/oal/rt/OALEngine.java          |   3 +
 .../server/core/oal/rt/OALEngineLoaderService.java |   5 +
 .../server/core/storage/StorageBuilderFactory.java |  80 +++++++++++++
 .../oap/server/core/storage/StorageModule.java     |  39 +++----
 .../StorageModuleElasticsearchProvider.java        |   3 +
 .../plugin/influxdb/InfluxStorageProvider.java     |   3 +
 .../storage/plugin/jdbc/h2/H2StorageProvider.java  |   3 +
 .../plugin/jdbc/mysql/MySQLStorageProvider.java    |   3 +
 17 files changed, 186 insertions(+), 274 deletions(-)

diff --git a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBaseTest.java b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBaseTest.java
deleted file mode 100644
index e0b5cdf..0000000
--- a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBaseTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
-
-import com.google.common.collect.Maps;
-import org.apache.skywalking.apm.network.language.agent.v3.Label;
-import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
-import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
-import org.apache.skywalking.apm.network.language.agent.v3.MeterHistogram;
-import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
-import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
-import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
-import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramPercentileFunction;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.junit.Before;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
-
-import java.util.HashMap;
-import java.util.List;
-
-import static org.mockito.Mockito.when;
-
-public abstract class MeterBaseTest {
-    private static final String CONFIG_PATH = "meter-analyzer-config";
-
-    @Mock
-    protected CoreModuleProvider moduleProvider;
-    @Mock
-    protected ModuleManager moduleManager;
-
-    protected MeterSystem meterSystem;
-    protected MeterProcessor processor;
-    protected long timestamp;
-
-    @Before
-    public void init() throws Exception {
-        // prepare the context
-        meterSystem = Mockito.spy(new MeterSystem(moduleManager));
-        CoreModule coreModule = Mockito.spy(CoreModule.class);
-
-        // disable meter register
-        DisableRegister.INSTANCE.add("meter_build_test1");
-        DisableRegister.INSTANCE.add("meter_build_test2");
-        DisableRegister.INSTANCE.add("meter_build_test3");
-
-        Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
-        when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
-
-        when(moduleProvider.getService(MeterSystem.class))
-            .thenReturn(meterSystem);
-
-        // prepare the meter functions
-        final HashMap<String, Class> map = Maps.newHashMap();
-        map.put("avg", AvgFunction.class);
-        map.put("avgHistogram", AvgHistogramFunction.class);
-        map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class);
-        Whitebox.setInternalState(meterSystem, "functionRegister", map);
-
-        // load context
-        List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(CONFIG_PATH, new String[] {"config.yaml"});
-        final MeterProcessService service = new MeterProcessService(moduleManager);
-        service.start(meterConfigs);
-
-        // create process and read meters
-        processor = service.createProcessor();
-
-        timestamp = System.currentTimeMillis();
-        // single value
-        processor.read(MeterData.newBuilder()
-                                .setService("service").setServiceInstance("instance").setTimestamp(timestamp)
-                                .setSingleValue(MeterSingleValue.newBuilder().setName("test_count1")
-                                                                .addLabels(Label.newBuilder()
-                                                                                .setName("k1")
-                                                                                .setValue("v1")
-                                                                                .build()).setValue(1).build())
-                                .build());
-
-        // histogram
-        processor.read(MeterData.newBuilder()
-                                .setHistogram(MeterHistogram.newBuilder().setName("test_histogram")
-                                                            .addLabels(
-                                                                Label.newBuilder().setName("k2").setValue("v2").build())
-                                                            .addLabels(
-                                                                Label.newBuilder().setName("endpoint").setValue("test_endpoint").build())
-                                                            .addValues(MeterBucketValue.newBuilder()
-                                                                                       .setBucket(1)
-                                                                                       .setCount(10)
-                                                                                       .build())
-                                                            .addValues(MeterBucketValue.newBuilder()
-                                                                                       .setBucket(5)
-                                                                                       .setCount(15)
-                                                                                       .build())
-                                                            .addValues(MeterBucketValue.newBuilder()
-                                                                                       .setBucket(10)
-                                                                                       .setCount(3)
-                                                                                       .build())
-                                                            .build())
-                                .build());
-    }
-}
diff --git a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBuilderTest.java b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBuilderTest.java
deleted file mode 100644
index f455438..0000000
--- a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/meter/process/MeterBuilderTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
-
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramPercentileFunction;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*", "org.w3c.*"})
-public class MeterBuilderTest extends MeterBaseTest {
-
-    @Test
-    public void testBuildAndSend() throws ModuleStartException {
-        List<AcceptableValue> values = new ArrayList<>();
-        doAnswer(invocationOnMock -> {
-            values.add(invocationOnMock.getArgument(0, AcceptableValue.class));
-            return null;
-        }).when(meterSystem).doStreamingCalculation(any());
-
-        // Prcess the meters
-        processor.process();
-
-        Assert.assertEquals(3, values.size());
-        // Check avg
-        final AvgFunction avg = (AvgFunction) values.get(0);
-        Assert.assertEquals(1, avg.getSummation());
-        Assert.assertEquals(1, avg.getCount());
-        Assert.assertEquals(IDManager.ServiceID.buildId("service", true), avg.getServiceId());
-        Assert.assertEquals(IDManager.ServiceID.buildId("service", true), avg.getEntityId());
-        Assert.assertEquals(TimeBucket.getMinuteTimeBucket(timestamp), avg.getTimeBucket());
-
-        // Check avgHistogram
-        final AvgHistogramFunction avgHistogram = (AvgHistogramFunction) values.get(1);
-        verifyDataTable(avgHistogram.getSummation(), 1, 10, 5, 15, 10, 3);
-        verifyDataTable(avgHistogram.getCount(), 1, 1, 5, 1, 10, 1);
-        Assert.assertEquals(IDManager.ServiceInstanceID.buildId(
-            IDManager.ServiceID.buildId("service", true), "instance"), avgHistogram.getEntityId());
-        Assert.assertEquals(TimeBucket.getMinuteTimeBucket(timestamp), avgHistogram.getTimeBucket());
-
-        // Check avgHistogramPercentile
-        final AvgHistogramPercentileFunction avgPercentile = (AvgHistogramPercentileFunction) values.get(2);
-        Assert.assertEquals(3, avgPercentile.getRanks().size());
-        Assert.assertEquals(50, avgPercentile.getRanks().get(0));
-        Assert.assertEquals(90, avgPercentile.getRanks().get(1));
-        Assert.assertEquals(99, avgPercentile.getRanks().get(2));
-        Assert.assertEquals(IDManager.EndpointID.buildId(
-            IDManager.ServiceID.buildId("service", true), "test_endpoint"), avgPercentile.getEntityId());
-        verifyDataTable(avgPercentile.getSummation(), 1, 10, 5, 15, 10, 3);
-        verifyDataTable(avgPercentile.getCount(), 1, 1, 5, 1, 10, 1);
-    }
-
-    private void verifyDataTable(DataTable table, Object... data) {
-        Assert.assertEquals(data.length / 2, table.size());
-        for (int i = 0; i < data.length; i += 2) {
-            Assert.assertEquals(
-                Long.parseLong(String.valueOf(data[i + 1])), table.get(String.valueOf(data[i])).longValue());
-        }
-    }
-}
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
index 334cd3d..e411b41 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
@@ -65,6 +65,7 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
 import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException;
 import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
 import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -80,7 +81,6 @@ public class OALRuntime implements OALEngine {
     private static final String CLASS_FILE_CHARSET = "UTF-8";
     private static final String METRICS_FUNCTION_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.metrics.";
     private static final String WITH_METADATA_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata";
-    private static final String STORAGE_BUILDER_INTERFACE = "org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder";
     private static final String DISPATCHER_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.SourceDispatcher";
     private static final String METRICS_STREAM_PROCESSOR = "org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor";
     private static final String[] METRICS_CLASS_METHODS = {
@@ -107,6 +107,7 @@ public class OALRuntime implements OALEngine {
     private AllDispatcherContext allDispatcherContext;
     private StreamAnnotationListener streamAnnotationListener;
     private DispatcherDetectorListener dispatcherDetectorListener;
+    private StorageBuilderFactory storageBuilderFactory;
     private final List<Class> metricsClasses;
     private final List<Class> dispatcherClasses;
     private final boolean openEngineDebug;
@@ -134,6 +135,11 @@ public class OALRuntime implements OALEngine {
     }
 
     @Override
+    public void setStorageBuilderFactory(final StorageBuilderFactory factory) {
+        storageBuilderFactory = factory;
+    }
+
+    @Override
     public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException {
         if (!IS_RT_TEMP_FOLDER_INIT_COMPLETED) {
             prepareRTTempFolder();
@@ -318,7 +324,7 @@ public class OALRuntime implements OALEngine {
         String className = metricsBuilderClassName(metricsStmt, false);
         CtClass metricsBuilderClass = classPool.makeClass(metricsBuilderClassName(metricsStmt, true));
         try {
-            metricsBuilderClass.addInterface(classPool.get(STORAGE_BUILDER_INTERFACE));
+            metricsBuilderClass.addInterface(classPool.get(storageBuilderFactory.builderTemplate().getSuperClass()));
         } catch (NotFoundException e) {
             log.error("Can't find StorageBuilder interface for " + className + ".", e);
             throw new OALCompileException(e.getMessage(), e);
@@ -342,7 +348,9 @@ public class OALRuntime implements OALEngine {
         for (String method : METRICS_BUILDER_CLASS_METHODS) {
             StringWriter methodEntity = new StringWriter();
             try {
-                configuration.getTemplate("metrics-builder/" + method + ".ftl").process(metricsStmt, methodEntity);
+                configuration
+                    .getTemplate(storageBuilderFactory.builderTemplate().getTemplatePath() + "/" + method + ".ftl")
+                    .process(metricsStmt, methodEntity);
                 metricsBuilderClass.addMethod(CtNewMethod.make(methodEntity.toString(), metricsBuilderClass));
             } catch (Exception e) {
                 log.error("Can't generate method " + method + " for " + className + ".", e);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
index 052ea50..5febdf5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
@@ -51,6 +51,10 @@ public class StreamAnnotationListener implements AnnotationListener {
         if (aClass.isAnnotationPresent(Stream.class)) {
             Stream stream = (Stream) aClass.getAnnotation(Stream.class);
 
+            if (DisableRegister.INSTANCE.include(stream.name())) {
+                return;
+            }
+
             if (stream.processor().equals(RecordStreamProcessor.class)) {
                 RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
             } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
index 8a44a4d..e516128 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
@@ -18,22 +18,24 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
 import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -60,15 +62,16 @@ public class ManagementStreamProcessor implements StreamProcessor<ManagementData
 
     @Override
     public void create(final ModuleDefineHolder moduleDefineHolder, final Stream stream, final Class<? extends ManagementData> streamClass) throws StorageException {
-        if (DisableRegister.INSTANCE.include(stream.name())) {
-            return;
-        }
+        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
+                                                                              .provider()
+                                                                              .getService(StorageBuilderFactory.class);
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(streamClass, stream.builder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IManagementDAO managementDAO;
         try {
-            managementDAO = storageDAO.newManagementDao(stream.builder().newInstance());
-        } catch (InstantiationException | IllegalAccessException e) {
+            managementDAO = storageDAO.newManagementDao(builder.getDeclaredConstructor().newInstance());
+        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
             throw new UnexpectedException("Create " + stream.builder()
                     .getSimpleName() + " none stream record DAO failure.", e);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 4e85b4d..9d8faf7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,7 +27,6 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
@@ -35,12 +35,14 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
@@ -103,15 +105,16 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
     public void create(ModuleDefineHolder moduleDefineHolder,
                        StreamDefinition stream,
                        Class<? extends Metrics> metricsClass) throws StorageException {
-        if (DisableRegister.INSTANCE.include(stream.getName())) {
-            return;
-        }
+        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
+                                                                              .provider()
+                                                                              .getService(StorageBuilderFactory.class);
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IMetricsDAO metricsDAO;
         try {
-            metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
-        } catch (InstantiationException | IllegalAccessException e) {
+            metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
+        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
             throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
         }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
index 8878d44..00cda59 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
@@ -18,22 +18,24 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -60,15 +62,16 @@ public class NoneStreamProcessor implements StreamProcessor<NoneStream> {
 
     @Override
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) throws StorageException {
-        if (DisableRegister.INSTANCE.include(stream.name())) {
-            return;
-        }
+        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
+                                                                              .provider()
+                                                                              .getService(StorageBuilderFactory.class);
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(streamClass, stream.builder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         INoneStreamDAO noneStream;
         try {
-            noneStream = storageDAO.newNoneStreamDao(stream.builder().newInstance());
-        } catch (InstantiationException | IllegalAccessException e) {
+            noneStream = storageDAO.newNoneStreamDao(builder.getDeclaredConstructor().newInstance());
+        } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
             throw new UnexpectedException("Create " + stream.builder()
                                                             .getSimpleName() + " none stream record DAO failure.", e);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 1b2a725..23a6531 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -18,22 +18,24 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 public class RecordStreamProcessor implements StreamProcessor<Record> {
@@ -57,15 +59,16 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
     @Override
     @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
-        if (DisableRegister.INSTANCE.include(stream.name())) {
-            return;
-        }
+        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
+                                                                              .provider()
+                                                                              .getService(StorageBuilderFactory.class);
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(recordClass, stream.builder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IRecordDAO recordDAO;
         try {
-            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
-        } catch (InstantiationException | IllegalAccessException e) {
+            recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
+        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
             throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
         }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 7222223..5a0c6a4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,19 +27,20 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -66,17 +68,18 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
     @Override
     @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) throws StorageException {
-        if (DisableRegister.INSTANCE.include(stream.name())) {
-            return;
-        }
+        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
+                                                                              .provider()
+                                                                              .getService(StorageBuilderFactory.class);
+        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(topNClass, stream.builder());
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IRecordDAO recordDAO;
         try {
-            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
-        } catch (InstantiationException | IllegalAccessException e) {
-            throw new UnexpectedException("Create " + stream.builder()
-                                                            .getSimpleName() + " top n record DAO failure.", e);
+            recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
+        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+            throw new UnexpectedException(
+                "Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
         }
 
         ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java
index 95d29537..fefde96 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.oal.rt;
 
 import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
 import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 
 /**
@@ -30,6 +31,8 @@ public interface OALEngine {
 
     void setDispatcherListener(DispatcherDetectorListener listener) throws ModuleStartException;
 
+    void setStorageBuilderFactory(StorageBuilderFactory factory);
+
     void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException;
 
     void notifyAllListeners() throws ModuleStartException;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngineLoaderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngineLoaderService.java
index 5d2b0d2..d1f4c63 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngineLoaderService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngineLoaderService.java
@@ -25,6 +25,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -56,6 +58,9 @@ public class OALEngineLoaderService implements Service {
                                                       .provider()
                                                       .getService(SourceReceiver.class)
                                                       .getDispatcherDetectorListener());
+            engine.setStorageBuilderFactory(moduleManager.find(StorageModule.NAME)
+                                                         .provider()
+                                                         .getService(StorageBuilderFactory.class));
 
             engine.start(OALEngineLoaderService.class.getClassLoader());
             engine.notifyAllListeners();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilderFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilderFactory.java
new file mode 100644
index 0000000..19e2430
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilderFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.HashMap;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * StorageBuilderFactory provides the capabilities to override the default storage builders, which are implementations
+ * of {@link StorageHashMapBuilder}.
+ *
+ * Typically, the storage needs to provide a more native format rather than {@link java.util.HashMap}.
+ */
+public interface StorageBuilderFactory extends Service {
+    /**
+     * @return the builder definition for OAL Engine.
+     */
+    BuilderTemplateDefinition builderTemplate();
+
+    /**
+     * Fetch the real builder by the given type of stream data and the static declared by the {@link Stream#builder()}.
+     *
+     * @param dataType       of the stream data.
+     * @param defaultBuilder static builder.
+     * @return the builder used in the runtime.
+     */
+    Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType,
+                                              Class<? extends StorageBuilder> defaultBuilder);
+
+    @Getter
+    @RequiredArgsConstructor
+    class BuilderTemplateDefinition {
+        /**
+         * The parent class of the generator builder.
+         */
+        private final String superClass;
+        /**
+         * This folder includes entity2Storage.ftl and storage2Entity.ftl to support the builder's generation.
+         */
+        private final String templatePath;
+    }
+
+    /**
+     * The default storage builder. Use {@link StorageHashMapBuilder} to provide general suitable entity builder
+     * implementation, which deliver {@link HashMap} to storage module implementation.
+     */
+    class Default implements StorageBuilderFactory {
+        @Override
+        public BuilderTemplateDefinition builderTemplate() {
+            return new BuilderTemplateDefinition(
+                StorageHashMapBuilder.class.getName(), "metrics-builder");
+        }
+
+        @Override
+        public Class<? extends StorageBuilder> builderOf(final Class<? extends StorageData> dataType,
+                                                         final Class<? extends StorageBuilder> defaultBuilder) {
+            return defaultBuilder;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 842a27a..c8e9074 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -49,25 +49,26 @@ public class StorageModule extends ModuleDefine {
 
     @Override
     public Class[] services() {
-        return new Class[]{
-                IBatchDAO.class,
-                StorageDAO.class,
-                IHistoryDeleteDAO.class,
-                INetworkAddressAliasDAO.class,
-                ITopologyQueryDAO.class,
-                IMetricsQueryDAO.class,
-                ITraceQueryDAO.class,
-                IMetadataQueryDAO.class,
-                IAggregationQueryDAO.class,
-                IAlarmQueryDAO.class,
-                ITopNRecordsQueryDAO.class,
-                ILogQueryDAO.class,
-                IProfileTaskQueryDAO.class,
-                IProfileTaskLogQueryDAO.class,
-                IProfileThreadSnapshotQueryDAO.class,
-                UITemplateManagementDAO.class,
-                IBrowserLogQueryDAO.class,
-                IEventQueryDAO.class
+        return new Class[] {
+            StorageBuilderFactory.class,
+            IBatchDAO.class,
+            StorageDAO.class,
+            IHistoryDeleteDAO.class,
+            INetworkAddressAliasDAO.class,
+            ITopologyQueryDAO.class,
+            IMetricsQueryDAO.class,
+            ITraceQueryDAO.class,
+            IMetadataQueryDAO.class,
+            IAggregationQueryDAO.class,
+            IAlarmQueryDAO.class,
+            ITopNRecordsQueryDAO.class,
+            ILogQueryDAO.class,
+            IProfileTaskQueryDAO.class,
+            IProfileTaskLogQueryDAO.class,
+            IProfileThreadSnapshotQueryDAO.class,
+            UITemplateManagementDAO.class,
+            IBrowserLogQueryDAO.class,
+            IEventQueryDAO.class
         };
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index b772e57..9a3f6ae 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -113,6 +114,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
         if (!StringUtil.isEmpty(config.getNameSpace())) {
             config.setNameSpace(config.getNameSpace().toLowerCase());
         }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
index ecdc2ed..e3e0b7d 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -95,6 +96,8 @@ public class InfluxStorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
         client = new InfluxClient(config);
 
         this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 63fa518..98c0612 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -107,6 +108,8 @@ public class H2StorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
         Properties settings = new Properties();
         settings.setProperty("dataSourceClassName", config.getDriver());
         settings.setProperty("dataSource.url", config.getUrl());
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index c1a9315..26e1758 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -98,6 +99,8 @@ public class MySQLStorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
         mysqlClient = new JDBCHikariCPClient(config.getProperties());
 
         this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));