You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/08/03 04:01:54 UTC

[GitHub] wu-sheng closed pull request #1516: Feature/oap/storage

wu-sheng closed pull request #1516: Feature/oap/storage
URL: https://github.com/apache/incubator-skywalking/pull/1516
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
index 7dcf14f6f..2b1e335e3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.storage.base.dao;
 
 import java.util.List;
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 65360c14c..c34718b9f 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -55,7 +55,7 @@
         <h2.version>1.4.196</h2.version>
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
-        <elasticsearch.version>6.3.1</elasticsearch.version>
+        <elasticsearch.version>6.3.2</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
     </properties>
@@ -142,7 +142,7 @@
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
-                <artifactId>elasticsearch-rest-client</artifactId>
+                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                 <version>${elasticsearch.version}</version>
             </dependency>
             <dependency>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 757c34a76..d116ae835 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -48,7 +48,7 @@ public CoreModuleProvider() {
         super();
         this.moduleConfig = new CoreModuleConfig();
         this.indicatorMapper = new IndicatorMapper();
-        this.workerMapper = new WorkerMapper(getManager());
+        this.workerMapper = new WorkerMapper();
     }
 
     @Override public String name() {
@@ -87,7 +87,7 @@ public CoreModuleProvider() {
 
         try {
             indicatorMapper.load();
-            workerMapper.load();
+            workerMapper.load(getManager());
         } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
new file mode 100644
index 000000000..f290fd059
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core;
+
+/**
+ * @author wu-sheng
+ */
+public class UnexpectedException extends RuntimeException {
+    public UnexpectedException(String message) {
+        super(message);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
index 62d51341e..f9177fe57 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -20,23 +20,17 @@
 
 import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
 
-    private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
-
-    private final EndpointLatencyAvgRemoteWorker remoter;
-
     public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
         super(moduleManager);
-        this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
     }
 
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
-        remoter.in(data);
+    @Override public Class nextWorkerClass() {
+        return EndpointLatencyAvgRemoteWorker.class;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index e339afa73..7054f3d8b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,16 +18,33 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import java.util.*;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.*;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
-    @Setter @Getter private int id;
+    private static final String NAME = "endpoint_latency_avg";
+    private static final String ID = "id";
+    private static final String SERVICE_ID = "service_id";
+    private static final String SERVICE_INSTANCE_ID = "service_instance_id";
+
+    @Setter @Getter @Column(columnName = ID) private int id;
+    @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
+    @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
+
+    @Override public String name() {
+        return NAME;
+    }
+
+    @Override public String id() {
+        return String.valueOf(id);
+    }
 
     @Override public int hashCode() {
         int result = 17;
@@ -56,18 +73,49 @@
     @Override public RemoteData.Builder serialize() {
         RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
         remoteBuilder.setDataIntegers(0, getId());
-        remoteBuilder.setDataIntegers(1, getCount());
+        remoteBuilder.setDataIntegers(1, getServiceId());
+        remoteBuilder.setDataIntegers(2, getServiceInstanceId());
+        remoteBuilder.setDataIntegers(3, getCount());
 
         remoteBuilder.setDataLongs(0, getTimeBucket());
         remoteBuilder.setDataLongs(1, getSummation());
+        remoteBuilder.setDataLongs(2, getValue());
+
         return remoteBuilder;
     }
 
     @Override public void deserialize(RemoteData remoteData) {
         setId(remoteData.getDataIntegers(0));
-        setCount(remoteData.getDataIntegers(1));
+        setServiceId(remoteData.getDataIntegers(1));
+        setServiceInstanceId(remoteData.getDataIntegers(2));
+        setCount(remoteData.getDataIntegers(3));
 
         setTimeBucket(remoteData.getDataLongs(0));
         setSummation(remoteData.getDataLongs(1));
+        setValue(remoteData.getDataLongs(2));
+    }
+
+    @Override public Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(ID, id);
+        map.put(SERVICE_ID, serviceId);
+        map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
+        map.put(COUNT, getCount());
+        map.put(SUMMATION, getSummation());
+        map.put(VALUE, getValue());
+        map.put(TIME_BUCKET, getTimeBucket());
+        return map;
+    }
+
+    @Override public Indicator newOne(Map<String, Object> dbMap) {
+        EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
+        indicator.setId((Integer)dbMap.get(ID));
+        indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
+        indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
+        indicator.setCount((Integer)dbMap.get(COUNT));
+        indicator.setSummation((Long)dbMap.get(SUMMATION));
+        indicator.setValue((Long)dbMap.get(VALUE));
+        indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
+        return indicator;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index e3c5c2306..288d568fc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -29,4 +29,8 @@
     public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
         super(moduleManager);
     }
+
+    @Override protected boolean needMergeDBData() {
+        return true;
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index da065f37f..1bae8ec81 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -21,15 +21,21 @@
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
 import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType(selector = Selector.HashCode)
+@IndicatorType(selector = Selector.HashCode, needMerge = true)
 public abstract class AvgIndicator extends Indicator {
 
-    @Getter @Setter private long summation;
-    @Getter @Setter private int count;
+    protected static final String SUMMATION = "summation";
+    protected static final String COUNT = "count";
+    protected static final String VALUE = "value";
+
+    @Getter @Setter @Column(columnName = SUMMATION) private long summation;
+    @Getter @Setter @Column(columnName = COUNT) private int count;
+    @Getter @Setter @Column(columnName = VALUE) private long value;
 
     @Entrance
     public final void combine(@SourceFrom long summation, @ConstOne int count) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 533379e7d..de932550e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,15 +18,27 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
+import java.util.Map;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class Indicator extends StreamData {
 
-    @Getter @Setter private long timeBucket;
+    protected static final String TIME_BUCKET = "time_bucket";
+
+    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+
+    public abstract String id();
 
     public abstract void combine(Indicator indicator);
+
+    public abstract String name();
+
+    public abstract Map<String, Object> toMap();
+
+    public abstract Indicator newOne(Map<String, Object> dbMap);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 46f03450f..d1ad273e4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -28,4 +28,6 @@
 @Retention(RetentionPolicy.SOURCE)
 public @interface IndicatorType {
     Selector selector();
+
+    boolean needMerge();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
index 8513fa2cc..b60e3f779 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -79,4 +79,8 @@ public int findIdByClass(Class indicatorClass) {
     public Class<Indicator> findClassById(int id) {
         return idKeyMapping.get(id);
     }
+
+    public Collection<Class<Indicator>> indicatorClasses() {
+        return idKeyMapping.values();
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index 65d68b440..1fadf906d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -21,8 +21,10 @@
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
@@ -33,11 +35,14 @@
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
 
+    private Worker worker;
+    private final ModuleManager moduleManager;
     private final DataCarrier<INPUT> dataCarrier;
     private final MergeDataCache<INPUT> mergeDataCache;
     private int messageNum;
 
     public AbstractAggregatorWorker(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
@@ -78,7 +83,15 @@ private void sendToNext() {
         mergeDataCache.finishReadingLast();
     }
 
-    protected abstract void onNext(INPUT data);
+    private void onNext(INPUT data) {
+        if (worker == null) {
+            WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+            worker = workerMapper.findInstanceByClass(nextWorkerClass());
+        }
+        worker.in(data);
+    }
+
+    public abstract Class nextWorkerClass();
 
     private void aggregate(INPUT message) {
         mergeDataCache.writing();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
index bcead6792..59f15022a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -18,18 +18,119 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+import static java.util.Objects.nonNull;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> {
 
+    private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentWorker.class);
+
+    private final MergeDataCache<INPUT> mergeDataCache;
+    private final IBatchDAO batchDAO;
+    private final IPersistenceDAO<?, ?, INPUT> persistenceDAO;
+    private final int blockBatchPersistenceSize = 1000;
+
     public AbstractPersistentWorker(ModuleManager moduleManager) {
+        this.mergeDataCache = new MergeDataCache<>();
+        this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
+        this.persistenceDAO = moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
+    }
+
+    public final Window<MergeDataCollection<INPUT>> getCache() {
+        return mergeDataCache;
     }
 
     @Override public final void in(INPUT input) {
+        if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
+            try {
+                if (getCache().trySwitchPointer()) {
+                    getCache().switchPointer();
+
+                    List<?> collection = buildBatchCollection();
+                    batchDAO.batchPersistence(collection);
+                }
+            } finally {
+                getCache().trySwitchPointerFinally();
+            }
+        }
+        cacheData(input);
+    }
+
+    public final List<?> buildBatchCollection() {
+        List<?> batchCollection = new LinkedList<>();
+        try {
+            while (getCache().getLast().isWriting()) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    logger.warn("thread wake up");
+                }
+            }
 
+            if (getCache().getLast().collection() != null) {
+                batchCollection = prepareBatch(getCache().getLast());
+            }
+        } finally {
+            getCache().finishReadingLast();
+        }
+        return batchCollection;
     }
+
+    private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) {
+        List<Object> batchCollection = new LinkedList<>();
+        collection.collection().forEach((id, data) -> {
+            if (needMergeDBData()) {
+                INPUT dbData = null;
+                try {
+                    dbData = persistenceDAO.get(data);
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
+                }
+                if (nonNull(dbData)) {
+                    dbData.combine(data);
+                    try {
+                        batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData));
+                    } catch (Throwable t) {
+                        logger.error(t.getMessage(), t);
+                    }
+                } else {
+                    try {
+                        batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+                    } catch (Throwable t) {
+                        logger.error(t.getMessage(), t);
+                    }
+                }
+            } else {
+                try {
+                    batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
+                }
+            }
+        });
+
+        return batchCollection;
+    }
+
+    private void cacheData(INPUT input) {
+        mergeDataCache.writing();
+        if (mergeDataCache.containsKey(input)) {
+            mergeDataCache.get(input).combine(input);
+        } else {
+            mergeDataCache.put(input);
+        }
+
+        mergeDataCache.finishWriting();
+    }
+
+    protected abstract boolean needMergeDBData();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
index 973243056..5d81b9b46 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -34,14 +34,12 @@
     private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
 
     private int id = 0;
-    private final ModuleManager moduleManager;
     private final Map<Class<Worker>, Integer> classKeyMapping;
     private final Map<Integer, Class<Worker>> idKeyMapping;
     private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
     private final Map<Integer, Worker> idKeyInstanceMapping;
 
-    public WorkerMapper(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public WorkerMapper() {
         this.classKeyMapping = new HashMap<>();
         this.idKeyMapping = new HashMap<>();
         this.classKeyInstanceMapping = new HashMap<>();
@@ -49,7 +47,7 @@ public WorkerMapper(ModuleManager moduleManager) {
     }
 
     @SuppressWarnings(value = "unchecked")
-    public void load() throws WorkerDefineLoadException {
+    public void load(ModuleManager moduleManager) throws WorkerDefineLoadException {
         try {
             List<String> workerClasses = new LinkedList<>();
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
index 24fe14d2a..c6be8d2b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
@@ -20,10 +20,12 @@
 
 import lombok.*;
 import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.oap.server.core.receiver.annotation.SourceType;
 
 /**
  * @author peng-yongsheng
  */
+@SourceType
 public class Endpoint extends Source {
     @Override public Scope scope() {
         return Scope.Endpoint;
@@ -31,7 +33,9 @@
 
     @Getter @Setter private int id;
     @Getter @Setter private String name;
+    @Getter @Setter private int serviceId;
     @Getter @Setter private String serviceName;
+    @Getter @Setter private int serviceInstanceId;
     @Getter @Setter private String serviceInstanceName;
     @Getter @Setter private int latency;
     @Getter @Setter private boolean status;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
new file mode 100644
index 000000000..2065022a3
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.receiver.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface SourceType {
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
new file mode 100644
index 000000000..db65fd79b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import org.apache.skywalking.oap.server.library.client.Client;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractDAO<C extends Client> implements DAO {
+    private final C client;
+
+    public AbstractDAO(C client) {
+        this.client = client;
+    }
+
+    public final C getClient() {
+        return client;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
new file mode 100644
index 000000000..ff1af291c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface DAO extends Service {
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
new file mode 100644
index 000000000..5c7a88a34
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.List;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface IBatchDAO extends DAO {
+
+    void batchPersistence(List<?> batchCollection);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
new file mode 100644
index 000000000..2a9d1f9a4
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> extends DAO {
+
+    INPUT get(INPUT input) throws IOException;
+
+    INSERT prepareBatchInsert(INPUT input) throws IOException;
+
+    UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+
+    void deleteHistory(Long timeBucketBefore);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
new file mode 100644
index 000000000..b9a4d945e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageException extends Exception {
+
+    public StorageException(String message) {
+        super(message);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
new file mode 100644
index 000000000..9e4545c63
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.storage.annotation.ColumnAnnotationRetrieval;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class StorageInstaller {
+
+    private static final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
+
+    private final ModuleManager moduleManager;
+    private final ColumnAnnotationRetrieval annotationRetrieval;
+
+    public StorageInstaller(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.annotationRetrieval = new ColumnAnnotationRetrieval();
+    }
+
+    public final void install(Client client) throws StorageException {
+        IndicatorMapper indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+        Collection<Class<Indicator>> indicatorClasses = indicatorMapper.indicatorClasses();
+
+        Boolean debug = System.getProperty("debug") != null;
+        for (Class<Indicator> indicatorClass : indicatorClasses) {
+            List<ColumnDefine> columnDefines = annotationRetrieval.retrieval(indicatorClass);
+
+            String tableName;
+            try {
+                tableName = indicatorClass.newInstance().name();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new StorageException(e.getMessage());
+            }
+            TableDefine tableDefine = new TableDefine(tableName, columnDefines);
+
+            if (!isExists(client, tableDefine)) {
+                logger.info("table: {} not exists", tableDefine.getName());
+                createTable(client, tableDefine);
+            } else if (debug) {
+                logger.info("table: {} exists", tableDefine.getName());
+                deleteTable(client, tableDefine);
+                createTable(client, tableDefine);
+            }
+            columnCheck(client, tableDefine);
+        }
+    }
+
+    protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
+
+    protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException;
+
+    protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException;
+
+    protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException;
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index ccd7db01b..559eb5c43 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -32,6 +32,6 @@
     }
 
     @Override public Class[] services() {
-        return new Class[] {};
+        return new Class[] {IBatchDAO.class, IPersistenceDAO.class};
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
new file mode 100644
index 000000000..aa6828b6e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Column {
+    String columnName();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
new file mode 100644
index 000000000..cf6a3dc1a
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.annotation;
+
+import java.lang.reflect.Field;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnAnnotationRetrieval {
+
+    private static final Logger logger = LoggerFactory.getLogger(ColumnAnnotationRetrieval.class);
+
+    public List<ColumnDefine> retrieval(Class<Indicator> indicatorClass) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Retrieval column annotation from class {}", indicatorClass.getName());
+        }
+        List<ColumnDefine> columnDefines = new LinkedList<>();
+        retrieval(indicatorClass, columnDefines);
+        return columnDefines;
+    }
+
+    private void retrieval(Class clazz, List<ColumnDefine> columnDefines) {
+        Field[] fields = clazz.getDeclaredFields();
+
+        for (Field field : fields) {
+            if (field.isAnnotationPresent(Column.class)) {
+                Column column = field.getAnnotation(Column.class);
+                columnDefines.add(new ColumnDefine(new ColumnName(column.columnName(), column.columnName()), field.getType()));
+                if (logger.isDebugEnabled()) {
+                    logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
+                }
+            }
+        }
+
+        if (Objects.nonNull(clazz.getSuperclass())) {
+            retrieval(clazz.getSuperclass(), columnDefines);
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
new file mode 100644
index 000000000..9bf4a87e1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnDefine {
+    private final ColumnName columnName;
+    private final Class<?> type;
+
+    public ColumnDefine(ColumnName columnName, Class<?> type) {
+        this.columnName = columnName;
+        this.type = type;
+    }
+
+    public final ColumnName getColumnName() {
+        return columnName;
+    }
+
+    public final Class<?> getType() {
+        return type;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
new file mode 100644
index 000000000..8dc8882ec
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnName {
+    private final String fullName;
+    private final String shortName;
+    private boolean useShortName = false;
+
+    public ColumnName(String fullName, String shortName) {
+        this.fullName = fullName;
+        this.shortName = shortName;
+    }
+
+    public String getName() {
+        return useShortName ? shortName : fullName;
+    }
+
+    public void useShortName() {
+        this.useShortName = true;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
new file mode 100644
index 000000000..3a6c3a87e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface ColumnTypeMapping {
+
+    String transform(Class<?> type);
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
new file mode 100644
index 000000000..e0f59ffaf
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.define;
+
+import java.util.List;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TableDefine {
+    private final String name;
+    private final List<ColumnDefine> columnDefines;
+
+    public TableDefine(String name, List<ColumnDefine> columnDefines) {
+        this.name = name;
+        this.columnDefines = columnDefines;
+    }
+
+    public final String getName() {
+        return name;
+    }
+
+    public final List<ColumnDefine> getColumnDefines() {
+        return columnDefines;
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index 17d1189ee..9865ff3ec 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -18,8 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
+import java.util.Map;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.*;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
@@ -35,4 +36,20 @@
 
     @Override public void deserialize(RemoteData remoteData) {
     }
+
+    @Override public String id() {
+        return null;
+    }
+
+    @Override public String name() {
+        return null;
+    }
+
+    @Override public Map<String, Object> toMap() {
+        return null;
+    }
+
+    @Override public Indicator newOne(Map<String, Object> dbMap) {
+        return null;
+    }
 }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
new file mode 100644
index 000000000..533c1be07
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.LinkedList;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.storage.define.TableDefine;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageInstallerTestCase {
+
+    @Test
+    public void testInstall() throws StorageException, DuplicateProviderException, ServiceNotProvidedException, IndicatorDefineLoadException {
+        IndicatorMapper indicatorMapper = new IndicatorMapper();
+        CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
+        CoreModule moduleDefine = Mockito.spy(CoreModule.class);
+        ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
+
+        LinkedList<ModuleProvider> moduleProviders = Whitebox.getInternalState(moduleDefine, "loadedProviders");
+        moduleProviders.add(moduleProvider);
+
+        Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
+        Mockito.when(moduleProvider.getService(IndicatorMapper.class)).thenReturn(indicatorMapper);
+
+        indicatorMapper.load();
+
+        TestStorageInstaller installer = new TestStorageInstaller(moduleManager);
+        installer.install(null);
+    }
+
+    class TestStorageInstaller extends StorageInstaller {
+
+        public TestStorageInstaller(ModuleManager moduleManager) {
+            super(moduleManager);
+        }
+
+        @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
+            return false;
+        }
+
+        @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
+
+        }
+
+        @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
+
+        }
+
+        @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
+
+        }
+    }
+}
diff --git a/oap-server/server-library/library-client/pom.xml b/oap-server/server-library/library-client/pom.xml
index 7ceece8d2..98fa2dee5 100644
--- a/oap-server/server-library/library-client/pom.xml
+++ b/oap-server/server-library/library-client/pom.xml
@@ -49,7 +49,7 @@
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-client</artifactId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 000000000..520e3f005
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.*;
+import org.elasticsearch.action.admin.indices.create.*;
+import org.elasticsearch.action.admin.indices.delete.*;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClient implements Client {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
+
+    private static final String TYPE = "type";
+    private final String clusterNodes;
+    private final NameSpace namespace;
+    private RestHighLevelClient client;
+
+    public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
+        this.clusterNodes = clusterNodes;
+        this.namespace = namespace;
+    }
+
+    @Override public void initialize() {
+        List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
+
+        client = new RestHighLevelClient(
+            RestClient.builder(pairsList.toArray(new HttpHost[0])));
+    }
+
+    @Override public void shutdown() {
+        try {
+            client.close();
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private List<HttpHost> parseClusterNodes(String nodes) {
+        List<HttpHost> httpHosts = new LinkedList<>();
+        logger.info("elasticsearch cluster nodes: {}", nodes);
+        String[] nodesSplit = nodes.split(",");
+        for (String node : nodesSplit) {
+            String host = node.split(":")[0];
+            String port = node.split(":")[1];
+            httpHosts.add(new HttpHost(host, Integer.valueOf(port)));
+        }
+
+        return httpHosts;
+    }
+
+    public boolean createIndex(String indexName, Settings settings,
+        XContentBuilder mappingBuilder) throws IOException {
+        indexName = formatIndexName(indexName);
+        CreateIndexRequest request = new CreateIndexRequest(indexName);
+        request.settings(settings);
+        request.mapping(TYPE, mappingBuilder);
+        CreateIndexResponse response;
+        response = client.indices().create(request);
+        logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+        return response.isAcknowledged();
+    }
+
+    public boolean deleteIndex(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        DeleteIndexResponse response;
+        response = client.indices().delete(request);
+        logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+        return response.isAcknowledged();
+    }
+
+    public boolean isExistsIndex(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+        GetIndexRequest request = new GetIndexRequest();
+        request.indices(indexName);
+        return client.indices().exists(request);
+    }
+
+    public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
+        indexName = formatIndexName(indexName);
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.types(TYPE);
+        searchRequest.source(searchSourceBuilder);
+        return client.search(searchRequest);
+    }
+
+    public GetResponse get(String indexName, String id) throws IOException {
+        indexName = formatIndexName(indexName);
+        GetRequest request = new GetRequest(indexName, TYPE, id);
+        return client.get(request);
+    }
+
+    public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
+        indexName = formatIndexName(indexName);
+        return new IndexRequest(indexName, TYPE, id).source(source);
+    }
+
+    public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
+        indexName = formatIndexName(indexName);
+        return new UpdateRequest(indexName, TYPE, id).doc(source);
+    }
+
+    public void delete(String indexName, String timeBucketColumnName, long startTimeBucket,
+        long endTimeBucket) throws IOException {
+        indexName = formatIndexName(indexName);
+        Map<String, String> params = Collections.singletonMap("pretty", "true");
+        String jsonString = "{" +
+            "  \"query\": {" +
+            "    \"range\": {" +
+            "      \"" + timeBucketColumnName + "\": {" +
+            "        \"gte\": " + startTimeBucket + "," +
+            "        \"lte\": " + endTimeBucket + "" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}";
+        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
+        client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
+    }
+
+    private String formatIndexName(String indexName) {
+        if (Objects.nonNull(namespace) && StringUtils.isNotEmpty(namespace.getNameSpace())) {
+            return namespace.getNameSpace() + "_" + indexName;
+        }
+        return indexName;
+    }
+
+    public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
+        int concurrentRequests) {
+        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+            @Override
+            public void beforeBulk(long executionId, BulkRequest request) {
+
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request,
+                BulkResponse response) {
+
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+                logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+            }
+        };
+
+        return BulkProcessor.builder(client::bulkAsync, listener)
+            .setBulkActions(bulkActions)
+            .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
+            .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
+            .setConcurrentRequests(concurrentRequests)
+            .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
+            .build();
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
new file mode 100644
index 000000000..bb71d07d5
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import org.apache.skywalking.oap.server.library.client.ClientException;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClientException extends ClientException {
+
+    public ElasticSearchClientException(String message) {
+        super(message);
+    }
+
+    public ElasticSearchClientException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
new file mode 100644
index 000000000..f696202a3
--- /dev/null
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.junit.Assert;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClientTestCase {
+
+    public static void main(String[] args) throws IOException, ClientException {
+        Settings settings = Settings.builder()
+            .put("number_of_shards", 2)
+            .put("number_of_replicas", 0)
+            .build();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject()
+            .startObject("_all")
+            .field("enabled", false)
+            .endObject()
+            .startObject("properties")
+            .startObject("column1")
+            .field("type", "text")
+            .endObject()
+            .endObject();
+        builder.endObject();
+
+        ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
+        client.initialize();
+
+        String indexName = "test";
+        client.createIndex(indexName, settings, builder);
+        Assert.assertTrue(client.isExistsIndex(indexName));
+        client.deleteIndex(indexName);
+        Assert.assertFalse(client.isExistsIndex(indexName));
+
+
+        client.shutdown();
+    }
+}
diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
index 74c908d2b..527821504 100644
--- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
+++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.oap.server.library.module;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.lang.reflect.Field;
 import java.util.*;
+import org.slf4j.*;
 
 /**
  * A module definition.
@@ -31,7 +29,7 @@
  */
 public abstract class ModuleDefine {
 
-    private final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
+    private static final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
 
     private LinkedList<ModuleProvider> loadedProviders = new LinkedList<>();
 
@@ -128,7 +126,7 @@ private Field getDeclaredField(Class<?> destClass, String fieldName) throws NoSu
         return loadedProviders;
     }
 
-    final ModuleProvider provider() throws DuplicateProviderException {
+    public final ModuleProvider provider() throws DuplicateProviderException {
         if (loadedProviders.size() > 1) {
             throw new DuplicateProviderException(this.name() + " module exist " + loadedProviders.size() + " providers");
         }
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index cabf8a2eb..46c35354c 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -35,6 +35,20 @@ core:
     gRPCPort: 11800
 storage:
   elasticsearch:
+    clusterNodes: localhost:9200
+    indexShardsNumber: 2
+    indexReplicasNumber: 0
+    # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+    bulkActions: 2000 # Execute the bulk every 2000 requests
+    bulkSize: 20 # flush the bulk every 20mb
+    flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
+    concurrentRequests: 2 # the number of concurrent requests
+    # Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
+    traceDataTTL: 90 # Unit is minute
+    minuteMetricDataTTL: 90 # Unit is minute
+    hourMetricDataTTL: 36 # Unit is hour
+    dayMetricDataTTL: 45 # Unit is day
+    monthMetricDataTTL: 18 # Unit is month
 service-mesh:
   default:
 query:
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
index e0adc4370..6c93cb69e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
@@ -36,5 +36,10 @@
             <artifactId>server-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 92768ad20..09b0bb250 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 
 /**
@@ -25,6 +26,8 @@
  */
 public class StorageModuleElasticsearchConfig extends ModuleConfig {
 
+    @Setter @Getter private String nameSpace;
+    @Setter @Getter private String clusterNodes;
     private int indexShardsNumber;
     private int indexReplicasNumber;
     private boolean highPerformanceMode;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 23367a532..b4aa7e8eb 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -18,8 +18,11 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
 import org.slf4j.*;
 
 /**
@@ -29,11 +32,14 @@
 
     private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
 
-    private final StorageModuleElasticsearchConfig storageConfig;
+    private final StorageModuleElasticsearchConfig config;
+    private final NameSpace nameSpace;
+    private ElasticSearchClient elasticSearchClient;
 
     public StorageModuleElasticsearchProvider() {
         super();
-        this.storageConfig = new StorageModuleElasticsearchConfig();
+        this.config = new StorageModuleElasticsearchConfig();
+        this.nameSpace = new NameSpace();
     }
 
     @Override
@@ -42,21 +48,34 @@ public String name() {
     }
 
     @Override
-    public Class module() {
+    public Class<? extends ModuleDefine> module() {
         return StorageModule.class;
     }
 
     @Override
     public ModuleConfig createConfigBeanIfAbsent() {
-        return storageConfig;
+        return config;
     }
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
+        elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), nameSpace);
+
+        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
+        this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace));
     }
 
     @Override
     public void start() throws ModuleStartException {
+        try {
+            nameSpace.setNameSpace(config.getNameSpace());
+            elasticSearchClient.initialize();
+
+            StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
+            installer.install(elasticSearchClient);
+        } catch (StorageException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
new file mode 100644
index 000000000..0e9ff2fe7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
+
+    private BulkProcessor bulkProcessor;
+    private final int bulkActions;
+    private final int bulkSize;
+    private final int flushInterval;
+    private final int concurrentRequests;
+
+    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+        int concurrentRequests) {
+        super(client);
+        this.bulkActions = bulkActions;
+        this.bulkSize = bulkSize;
+        this.flushInterval = flushInterval;
+        this.concurrentRequests = concurrentRequests;
+    }
+
+    @Override public void batchPersistence(List<?> batchCollection) {
+        if (bulkProcessor == null) {
+            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("bulk data size: {}", batchCollection.size());
+        }
+
+        if (CollectionUtils.isNotEmpty(batchCollection)) {
+            batchCollection.forEach(builder -> {
+                if (builder instanceof IndexRequest) {
+                    this.bulkProcessor.add((IndexRequest)builder);
+                }
+                if (builder instanceof UpdateRequest) {
+                    this.bulkProcessor.add((UpdateRequest)builder);
+                }
+            });
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
new file mode 100644
index 000000000..8e268c1ce
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.define.ColumnTypeMapping;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnTypeEsMapping implements ColumnTypeMapping {
+
+    @Override public String transform(Class<?> type) {
+        if (Integer.class.equals(type) || int.class.equals(type)) {
+            return "integer";
+        } else if (Long.class.equals(type) || long.class.equals(type)) {
+            return "long";
+        } else if (Double.class.equals(type) || double.class.equals(type)) {
+            return "double";
+        } else if (String.class.equals(type)) {
+            return "text";
+        } else {
+            throw new IllegalArgumentException("Unsupported data type: " + type.getName());
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
new file mode 100644
index 000000000..dd0a70bd8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+
+    private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
+
+    public EsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    protected final int getMaxId(String indexName, String columnName) {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
+        searchSourceBuilder.size(0);
+        return getResponse(indexName, searchSourceBuilder);
+    }
+
+    protected final int getMinId(String indexName, String columnName) {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
+        searchSourceBuilder.size(0);
+        return getResponse(indexName, searchSourceBuilder);
+    }
+
+    private int getResponse(String indexName, SearchSourceBuilder searchSourceBuilder) {
+        try {
+            SearchResponse searchResponse = getClient().search(indexName, searchSourceBuilder);
+            Max agg = searchResponse.getAggregations().get("agg");
+
+            int id = (int)agg.getValue();
+            if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
+                return 0;
+            } else {
+                return id;
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        return 0;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
new file mode 100644
index 000000000..1f83b633a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest, UpdateRequest, Indicator> {
+
+    private final ElasticSearchClient client;
+    private final NameSpace nameSpace;
+
+    public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
+        this.client = client;
+        this.nameSpace = nameSpace;
+    }
+
+    @Override public Indicator get(Indicator input) throws IOException {
+        GetResponse response = client.get(nameSpace.getNameSpace() + "_" + input.name(), input.id());
+        if (response.isExists()) {
+            return input.newOne(response.getSource());
+        } else {
+            return null;
+        }
+    }
+
+    @Override public IndexRequest prepareBatchInsert(Indicator input) throws IOException {
+        Map<String, Object> objectMap = input.toMap();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+        return client.prepareInsert(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
+    }
+
+    @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws IOException {
+        Map<String, Object> objectMap = input.toMap();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+        return client.prepareUpdate(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
+    }
+
+    @Override public void deleteHistory(Long timeBucketBefore) {
+
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
new file mode 100644
index 000000000..07868d808
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageEsInstaller extends StorageInstaller {
+
+    private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class);
+
+    private final int indexShardsNumber;
+    private final int indexReplicasNumber;
+    private final ColumnTypeEsMapping mapping;
+
+    public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
+        super(moduleManager);
+        this.indexShardsNumber = indexShardsNumber;
+        this.indexReplicasNumber = indexReplicasNumber;
+        this.mapping = new ColumnTypeEsMapping();
+    }
+
+    @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+        try {
+            return esClient.isExistsIndex(tableDefine.getName());
+        } catch (IOException e) {
+            throw new StorageException(e.getMessage());
+        }
+    }
+
+    @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
+
+    }
+
+    @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+        try {
+            if (!esClient.deleteIndex(tableDefine.getName())) {
+                throw new StorageException(tableDefine.getName() + " index delete failure.");
+            }
+        } catch (IOException e) {
+            throw new StorageException(tableDefine.getName() + " index delete failure.");
+        }
+    }
+
+    @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+        // mapping
+        XContentBuilder mappingBuilder = null;
+
+        Settings settings = createSettingBuilder();
+        try {
+            mappingBuilder = createMappingBuilder(tableDefine);
+            logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
+        } catch (Exception e) {
+            logger.error("create {} index mapping builder error", tableDefine.getName());
+        }
+
+        boolean isAcknowledged;
+        try {
+            isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
+        } catch (IOException e) {
+            throw new StorageException(e.getMessage());
+        }
+        logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged);
+
+        if (!isAcknowledged) {
+            throw new StorageException("create " + tableDefine.getName() + " index failure, ");
+        }
+    }
+
+    private Settings createSettingBuilder() {
+        return Settings.builder()
+            .put("index.number_of_shards", indexShardsNumber)
+            .put("index.number_of_replicas", indexReplicasNumber)
+            .put("index.refresh_interval", "3s")
+            .put("analysis.analyzer.collector_analyzer.type", "stop")
+            .build();
+    }
+
+    private XContentBuilder createMappingBuilder(TableDefine tableDefine) throws IOException {
+        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
+            .startObject()
+            .startObject("_all")
+            .field("enabled", false)
+            .endObject()
+            .startObject("properties");
+
+        for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
+            mappingBuilder
+                .startObject(columnDefine.getColumnName().getName())
+                .field("type", mapping.transform(columnDefine.getType()))
+                .endObject();
+        }
+
+        mappingBuilder
+            .endObject()
+            .endObject();
+
+        logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
+
+        return mappingBuilder;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
new file mode 100644
index 000000000..2553ab7b9
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.junit.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchColumnTypeMappingTestCase {
+
+    @Test
+    public void test() {
+        ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
+
+        Assert.assertEquals("integer", mapping.transform(int.class));
+        Assert.assertEquals("integer", mapping.transform(Integer.class));
+
+        Assert.assertEquals("long", mapping.transform(long.class));
+        Assert.assertEquals("long", mapping.transform(Long.class));
+
+        Assert.assertEquals("double", mapping.transform(double.class));
+        Assert.assertEquals("double", mapping.transform(Double.class));
+
+        Assert.assertEquals("text", mapping.transform(String.class));
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services