You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by pe...@apache.org on 2018/09/09 07:30:46 UTC

[incubator-skywalking] branch master updated: Streaming analysis topology test success. (#1646)

This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new bc57125  Streaming analysis topology test success. (#1646)
bc57125 is described below

commit bc571259b2bbf11487fa9974c919f5dfc60e0650
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sun Sep 9 15:30:43 2018 +0800

    Streaming analysis topology test success. (#1646)
    
    * Add generate indicator and dispatcher about service topology but not test.
    
    * Delete call type from service relation.
    
    * Streaming analysis topology test success.
    
    * Fixed the check style error.
---
 .../apache/skywalking/oap/server/core/Const.java   |   1 -
 .../skywalking/oap/server/core/CoreModule.java     |   3 +
 .../oap/server/core/CoreModuleProvider.java        |   6 +
 .../analysis/worker/IndicatorPersistentWorker.java |  12 ++
 .../core/analysis/worker/IndicatorProcess.java     |   3 +
 .../server/core/cache/EndpointInventoryCache.java  |  17 +--
 .../core/cache/NetworkAddressInventoryCache.java   |  23 +--
 .../core/cache/ServiceInstanceInventoryCache.java  |  30 +---
 .../server/core/cache/ServiceInventoryCache.java   |  33 ++--
 .../register/worker/RegisterPersistentWorker.java  |   2 +-
 .../oap/server/core/storage/PersistenceTimer.java  |  99 ++++++++++++
 .../server/library/buffer/DataStreamReader.java    |  20 ++-
 .../oap/server/library/buffer/OffsetStream.java    |   2 +-
 .../library/buffer/BufferStreamTestCase.java       |   3 +-
 .../trace/provider/TraceModuleProvider.java        |  17 ++-
 .../trace/provider/TraceServiceModuleConfig.java}  |  25 ++--
 .../handler/TraceSegmentServiceHandler.java        |  10 +-
 .../trace/provider/parser/SegmentParse.java        |  11 +-
 .../SegmentStandardizationWorker.java              |  26 ++--
 .../server/receiver/trace/mock/AgentDataMock.java  |  76 ++++++++++
 .../server/receiver/trace/mock/ConsumerMock.java   | 164 ++++++++++++++++++++
 .../server/receiver/trace/mock/ProviderMock.java   | 112 ++++++++++++++
 .../server/receiver/trace/mock/RegisterMock.java   | 166 +++++++++++++++++++++
 .../receiver/trace/mock/UniqueIdBuilder.java}      |  32 ++--
 .../src/test}/resources/log4j2.xml                 |  10 +-
 .../src/main/resources/application.yml             |   4 +
 .../server-starter/src/main/resources/log4j2.xml   |   5 +-
 .../plugin/elasticsearch/base/RegisterEsDAO.java   |   2 +-
 28 files changed, 765 insertions(+), 149 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index 2ce7c97..ca3583a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -33,7 +33,6 @@ public class Const {
     public static final String UNKNOWN = "Unknown";
     public static final String EXCEPTION = "Exception";
     public static final String EMPTY_STRING = "";
-    public static final String FILE_SUFFIX = "sw";
     public static final int SPAN_TYPE_VIRTUAL = 9;
     public static final String DOMAIN_OPERATION_NAME = "{domain}";
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 1052187..2d12a56 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
 
 import java.util.*;
 import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
@@ -42,6 +43,8 @@ public class CoreModule extends ModuleDefine {
 
     @Override public Class[] services() {
         List<Class> classes = new ArrayList<>();
+        classes.add(IComponentLibraryCatalogService.class);
+
         addServerInterface(classes);
         addReceiverInterface(classes);
         addInsideService(classes);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index d01de7d..dc32872 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Indic
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
 import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
 import org.apache.skywalking.oap.server.core.register.service.*;
 import org.apache.skywalking.oap.server.core.remote.*;
@@ -30,6 +31,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.*;
 import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
 import org.apache.skywalking.oap.server.library.module.*;
@@ -85,6 +87,8 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
         this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
 
+        this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
+
         this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
 
         this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
@@ -136,6 +140,8 @@ public class CoreModuleProvider extends ModuleProvider {
 
         RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
         this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
+
+        PersistenceTimer.INSTANCE.start(getManager());
     }
 
     @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 3a5bbe5..bca9752 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -73,6 +73,18 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
         cacheData(input);
     }
 
+    public boolean flushAndSwitch() {
+        boolean isSwitch;
+        try {
+            if (isSwitch = getCache().trySwitchPointer()) {
+                getCache().switchPointer();
+            }
+        } finally {
+            getCache().trySwitchPointerFinally();
+        }
+        return isSwitch;
+    }
+
     public final List<?> buildBatchCollection() {
         List<?> batchCollection = new LinkedList<>();
         try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 12cfce2..b87a83c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
+import lombok.Getter;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.storage.*;
@@ -33,6 +34,7 @@ public enum IndicatorProcess {
     INSTANCE;
 
     private Map<Class<? extends Indicator>, IndicatorAggregateWorker> entryWorkers = new HashMap<>();
+    @Getter private List<IndicatorPersistentWorker> persistentWorkers = new ArrayList<>();
 
     public void in(Indicator indicator) {
         entryWorkers.get(indicator.getClass()).in(indicator);
@@ -56,6 +58,7 @@ public enum IndicatorProcess {
         IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
             1000, moduleManager, indicatorDAO, alarmNotifyWorker);
         WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+        persistentWorkers.add(persistentWorker);
 
         IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, persistentWorker);
         WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
index 8cce526..5e7ff65 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.cache;
 
 import com.google.common.cache.*;
+import java.util.Objects;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.EndpointInventory;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -56,14 +57,9 @@ public class EndpointInventoryCache implements Service {
     public int getEndpointId(int serviceId, String endpointName) {
         String id = EndpointInventory.buildId(serviceId, endpointName);
 
-        int endpointId = Const.NONE;
-        try {
-            endpointId = endpointNameCache.get(id, () -> getCacheDAO().getEndpointId(serviceId, endpointName));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        Integer endpointId = endpointNameCache.getIfPresent(id);
 
-        if (endpointId == Const.NONE) {
+        if (Objects.isNull(endpointId) || endpointId == Const.NONE) {
             endpointId = getCacheDAO().getEndpointId(serviceId, endpointName);
             if (endpointId != Const.NONE) {
                 endpointNameCache.put(id, endpointId);
@@ -73,12 +69,7 @@ public class EndpointInventoryCache implements Service {
     }
 
     public EndpointInventory get(int endpointId) {
-        EndpointInventory endpointInventory = null;
-        try {
-            endpointInventory = endpointIdCache.get(endpointId, () -> getCacheDAO().get(endpointId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        EndpointInventory endpointInventory = endpointIdCache.getIfPresent(endpointId);
 
         if (isNull(endpointInventory)) {
             endpointInventory = getCacheDAO().get(endpointId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
index 91ed3d9..bbcd66e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.cache;
 
 import com.google.common.cache.*;
+import java.util.Objects;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -54,30 +55,20 @@ public class NetworkAddressInventoryCache implements Service {
     }
 
     public int getAddressId(String networkAddress) {
-        int addressId = Const.NONE;
-        try {
-            addressId = networkAddressCache.get(NetworkAddressInventory.buildId(networkAddress), () -> getCacheDAO().getAddressId(networkAddress));
+        Integer addressId = networkAddressCache.getIfPresent(NetworkAddressInventory.buildId(networkAddress));
 
-            if (addressId == Const.NONE) {
-                addressId = getCacheDAO().getAddressId(networkAddress);
-                if (addressId != Const.NONE) {
-                    networkAddressCache.put(NetworkAddressInventory.buildId(networkAddress), addressId);
-                }
+        if (Objects.isNull(addressId) || addressId == Const.NONE) {
+            addressId = getCacheDAO().getAddressId(networkAddress);
+            if (addressId != Const.NONE) {
+                networkAddressCache.put(NetworkAddressInventory.buildId(networkAddress), addressId);
             }
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
         }
 
         return addressId;
     }
 
     public NetworkAddressInventory get(int addressId) {
-        NetworkAddressInventory networkAddress = null;
-        try {
-            networkAddress = addressIdCache.get(addressId, () -> getCacheDAO().get(addressId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        NetworkAddressInventory networkAddress = addressIdCache.getIfPresent(addressId);
 
         if (isNull(networkAddress)) {
             networkAddress = getCacheDAO().get(addressId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
index 0f07b37..777c574 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
@@ -20,8 +20,9 @@ package org.apache.skywalking.oap.server.core.cache;
 
 import com.google.common.cache.*;
 import java.util.Objects;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.slf4j.*;
@@ -50,18 +51,13 @@ public class ServiceInstanceInventoryCache implements Service {
 
     private IServiceInstanceInventoryCacheDAO getCacheDAO() {
         if (isNull(cacheDAO)) {
-            this.cacheDAO = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
+            this.cacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
         }
         return this.cacheDAO;
     }
 
     public ServiceInstanceInventory get(int serviceInstanceId) {
-        ServiceInstanceInventory serviceInstanceInventory = null;
-        try {
-            serviceInstanceInventory = serviceInstanceIdCache.get(serviceInstanceId, () -> getCacheDAO().get(serviceInstanceId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        ServiceInstanceInventory serviceInstanceInventory = serviceInstanceIdCache.getIfPresent(serviceInstanceId);
 
         if (Objects.isNull(serviceInstanceInventory)) {
             serviceInstanceInventory = getCacheDAO().get(serviceInstanceId);
@@ -73,14 +69,9 @@ public class ServiceInstanceInventoryCache implements Service {
     }
 
     public int getServiceInstanceId(int serviceId, String serviceInstanceName) {
-        int serviceInstanceId = Const.NONE;
-        try {
-            serviceInstanceId = serviceInstanceNameCache.get(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), () -> getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        Integer serviceInstanceId = serviceInstanceNameCache.getIfPresent(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName));
 
-        if (serviceInstanceId == Const.NONE) {
+        if (Objects.isNull(serviceInstanceId) || serviceInstanceId == Const.NONE) {
             serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName);
             if (serviceId != Const.NONE) {
                 serviceInstanceNameCache.put(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), serviceInstanceId);
@@ -90,14 +81,9 @@ public class ServiceInstanceInventoryCache implements Service {
     }
 
     public int getServiceInstanceId(int serviceId, int addressId) {
-        int serviceInstanceId = Const.NONE;
-        try {
-            serviceInstanceId = addressIdCache.get(ServiceInstanceInventory.buildId(serviceId, addressId), () -> getCacheDAO().getServiceInstanceId(serviceId, addressId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        Integer serviceInstanceId = addressIdCache.getIfPresent(ServiceInstanceInventory.buildId(serviceId, addressId));
 
-        if (serviceInstanceId == Const.NONE) {
+        if (Objects.isNull(serviceInstanceId) || serviceInstanceId == Const.NONE) {
             serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, addressId);
             if (serviceId != Const.NONE) {
                 addressIdCache.put(ServiceInstanceInventory.buildId(serviceId, addressId), serviceInstanceId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
index 4213aca..8571a63 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.cache;
 
 import com.google.common.cache.*;
+import java.util.Objects;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInventory;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -54,14 +55,9 @@ public class ServiceInventoryCache implements Service {
     }
 
     public int getServiceId(String serviceName) {
-        int serviceId = Const.NONE;
-        try {
-            serviceId = serviceNameCache.get(ServiceInventory.buildId(serviceName), () -> getCacheDAO().getServiceId(serviceName));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        Integer serviceId = serviceNameCache.getIfPresent(ServiceInventory.buildId(serviceName));
 
-        if (serviceId == Const.NONE) {
+        if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
             serviceId = getCacheDAO().getServiceId(serviceName);
             if (serviceId != Const.NONE) {
                 serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
@@ -71,14 +67,9 @@ public class ServiceInventoryCache implements Service {
     }
 
     public int getServiceId(int addressId) {
-        int serviceId = Const.NONE;
-        try {
-            serviceId = addressIdCache.get(ServiceInventory.buildId(addressId), () -> getCacheDAO().getServiceId(addressId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        Integer serviceId = addressIdCache.getIfPresent(ServiceInventory.buildId(addressId));
 
-        if (serviceId == Const.NONE) {
+        if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
             serviceId = getCacheDAO().getServiceId(addressId);
             if (serviceId != Const.NONE) {
                 addressIdCache.put(ServiceInventory.buildId(addressId), serviceId);
@@ -88,12 +79,7 @@ public class ServiceInventoryCache implements Service {
     }
 
     public ServiceInventory get(int serviceId) {
-        ServiceInventory serviceInventory = null;
-        try {
-            serviceInventory = serviceIdCache.get(serviceId, () -> getCacheDAO().get(serviceId));
-        } catch (Throwable e) {
-            logger.error(e.getMessage(), e);
-        }
+        ServiceInventory serviceInventory = serviceIdCache.getIfPresent(serviceId);
 
         if (isNull(serviceInventory)) {
             serviceInventory = getCacheDAO().get(serviceId);
@@ -101,6 +87,13 @@ public class ServiceInventoryCache implements Service {
                 serviceIdCache.put(serviceId, serviceInventory);
             }
         }
+
+        if (logger.isDebugEnabled()) {
+            if (Objects.isNull(serviceInventory)) {
+                logger.debug("service id {} not find in cache.", serviceId);
+            }
+        }
+
         return serviceInventory;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 2fb8753..eb94d66 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -66,7 +66,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
                                 registerDAO.forceUpdate(modelName, newSource);
                             } else {
                                 int sequence = registerDAO.max(modelName);
-                                source.setSequence(sequence);
+                                source.setSequence(sequence + 1);
                                 registerDAO.forceInsert(modelName, source);
                             }
                         } catch (Throwable t) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
new file mode 100644
index 0000000..7271f04
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.*;
+import java.util.concurrent.*;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum PersistenceTimer {
+    INSTANCE;
+
+    private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
+
+    private Boolean isStarted = false;
+    private final Boolean debug;
+
+    PersistenceTimer() {
+        this.debug = System.getProperty("debug") != null;
+    }
+
+    public void start(ModuleManager moduleManager) {
+        logger.info("persistence timer start");
+        //TODO timer value config
+//        final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
+        final long timeInterval = 3;
+        IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
+
+        if (!isStarted) {
+            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+                new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
+                    t -> logger.error("Extract data and save failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
+
+            this.isStarted = true;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void extractDataAndSave(IBatchDAO batchDAO) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Extract data and save");
+        }
+
+        long startTime = System.currentTimeMillis();
+        try {
+            List batchAllCollection = new LinkedList();
+            IndicatorProcess.INSTANCE.getPersistentWorkers().forEach(worker -> {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("extract {} worker data and save", worker.getClass().getName());
+                }
+
+                if (worker.flushAndSwitch()) {
+                    List<?> batchCollection = worker.buildBatchCollection();
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+                    }
+                    batchAllCollection.addAll(batchCollection);
+                }
+            });
+
+            if (debug) {
+                logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            }
+            batchDAO.batchPersistence(batchAllCollection);
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (logger.isDebugEnabled()) {
+                logger.debug("persistence data save finish");
+            }
+        }
+
+        if (debug) {
+            logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+        }
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index 049f863..79b2c8e 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -98,6 +98,10 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
     }
 
     private void read() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Read buffer data");
+        }
+
         try {
             if (readOffset.getOffset() == readingFile.length() && !readOffset.isCurrentWriteFile()) {
                 FileUtils.forceDelete(readingFile);
@@ -108,7 +112,19 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
 
                 MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
                 if (messageType != null) {
-                    callBack.call(messageType);
+                    int i = 0;
+                    while (!callBack.call(messageType)) {
+                        try {
+                            TimeUnit.MILLISECONDS.sleep(500);
+                        } catch (InterruptedException e) {
+                            logger.error(e.getMessage());
+                        }
+
+                        i++;
+                        if (i == 10) {
+                            break;
+                        }
+                    }
                     final int serialized = messageType.getSerializedSize();
                     final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
                     readOffset.setOffset(readOffset.getOffset() + offset);
@@ -120,6 +136,6 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
     }
 
     public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
-        void call(MESSAGE_TYPE message);
+        boolean call(MESSAGE_TYPE message);
     }
 }
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
index 9fb4ed5..a20f7c9 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
@@ -84,8 +84,8 @@ class OffsetStream {
     void flush() {
         try {
             String offsetRecord = offset.serialize();
-            logger.debug("flush offset, record: {}", offsetRecord);
             if (!lastOffsetRecord.equals(offsetRecord)) {
+                logger.debug("flush offset, record: {}", offsetRecord);
                 if (offsetFile.length() >= FileUtils.ONE_MB * offsetFileMaxSize) {
                     nextFile();
                 }
diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
index 53eca71..9752de7 100644
--- a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -67,8 +67,9 @@ public class BufferStreamTestCase {
 
     private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {
 
-        @Override public void call(TraceSegmentObject message) {
+        @Override public boolean call(TraceSegmentObject message) {
             logger.info("segment parse: {}", message.getSpans(0).getSpanId());
+            return true;
         }
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 3adfd1f..266e85d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -24,15 +24,22 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
 
 /**
  * @author peng-yongsheng
  */
 public class TraceModuleProvider extends ModuleProvider {
 
+    private final TraceServiceModuleConfig moduleConfig;
+
+    public TraceModuleProvider() {
+        this.moduleConfig = new TraceServiceModuleConfig();
+    }
+
     @Override public String name() {
         return "default";
     }
@@ -42,7 +49,7 @@ public class TraceModuleProvider extends ModuleProvider {
     }
 
     @Override public ModuleConfig createConfigBeanIfAbsent() {
-        return null;
+        return moduleConfig;
     }
 
     @Override public void prepare() {
@@ -56,7 +63,11 @@ public class TraceModuleProvider extends ModuleProvider {
 
         GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
         try {
-            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(getManager(), listenerManager));
+            SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager);
+            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse));
+
+            SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+            segmentParse.setStandardizationWorker(standardizationWorker);
         } catch (IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
similarity index 51%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 2ce7c97..578cdd0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -16,24 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core;
+package org.apache.skywalking.oap.server.receiver.trace.provider;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 
 /**
  * @author peng-yongsheng
  */
-public class Const {
-    public static final int NONE = 0;
-    public static final String ID_SPLIT = "_";
-    public static final int NONE_SERVICE_ID = 1;
-    public static final int NONE_INSTANCE_ID = 1;
-    public static final int NONE_ENDPOINT_ID = 1;
-    public static final String NONE_ENDPOINT_NAME = "None";
-    public static final String USER_CODE = "User";
-    public static final String SEGMENT_SPAN_SPLIT = "S";
-    public static final String UNKNOWN = "Unknown";
-    public static final String EXCEPTION = "Exception";
-    public static final String EMPTY_STRING = "";
-    public static final String FILE_SUFFIX = "sw";
-    public static final int SPAN_TYPE_VIRTUAL = 9;
-    public static final String DOMAIN_OPERATION_NAME = "{domain}";
+public class TraceServiceModuleConfig extends ModuleConfig {
+    @Setter @Getter private String bufferPath;
+    @Setter @Getter private int bufferOffsetMaxFileSize;
+    @Setter @Getter private int bufferDataMaxFileSize;
+    @Setter @Getter private boolean bufferFileCleanWhenRestart;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
index 6b153e4..908395a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
@@ -19,11 +19,9 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.handler;
 
 import io.grpc.stub.StreamObserver;
-import java.io.IOException;
 import org.apache.skywalking.apm.network.language.agent.*;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
 import org.slf4j.*;
 
 /**
@@ -33,12 +31,12 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
 
     private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
 
-    private final SegmentParse segmentParse;
     private final Boolean debug;
+    private final SegmentParse segmentParse;
 
-    public TraceSegmentServiceHandler(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) throws IOException {
-        this.segmentParse = new SegmentParse(moduleManager, listenerManager);
+    public TraceSegmentServiceHandler(SegmentParse segmentParse) {
         this.debug = System.getProperty("debug") != null;
+        this.segmentParse = segmentParse;
     }
 
     @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 01bede2..78c6a78 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -19,8 +19,8 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
 import java.util.*;
+import lombok.Setter;
 import org.apache.skywalking.apm.network.language.agent.*;
 import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -41,20 +41,19 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
     private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
-    private final SegmentStandardizationWorker standardizationWorker;
+    @Setter private SegmentStandardizationWorker standardizationWorker;
 
-    public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) throws IOException {
+    public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
         this.segmentCoreInfo = new SegmentCoreInfo();
         this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
-        this.standardizationWorker = new SegmentStandardizationWorker(moduleManager, listenerManager,this);
     }
 
-    @Override public void call(UpstreamSegment segment) {
-        parse(segment, Source.Buffer);
+    @Override public boolean call(UpstreamSegment segment) {
+        return parse(segment, Source.Buffer);
     }
 
     public boolean parse(UpstreamSegment segment, Source source) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index d0d617c..d696a36 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -20,14 +20,12 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard
 
 import java.io.IOException;
 import java.util.*;
-
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.buffer.BufferStream;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
 import org.slf4j.*;
 
 /**
@@ -37,20 +35,18 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
 
     private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
 
-    private final DataCarrier<SegmentStandardization> dataCarrier;
     private final BufferStream<UpstreamSegment> stream;
 
-    public SegmentStandardizationWorker(ModuleManager moduleManager,
-                                        SegmentParserListenerManager listenerManager, SegmentParse segmentParse) throws IOException {
-        super(9999);
-        this.dataCarrier = new DataCarrier<>(1, 1024);
-        this.dataCarrier.consume(new Consumer(this), 1);
-
-        String directory = "/Users/pengys5/code/sky-walking/buffer-test";
-        BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(directory);
-//        builder.cleanWhenRestart(true);
-        builder.dataFileMaxSize(50);
-        builder.offsetFileMaxSize(10);
+    public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
+        int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
+        super(Integer.MAX_VALUE);
+        DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1, 1024);
+        dataCarrier.consume(new Consumer(this), 1);
+
+        BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
+        builder.cleanWhenRestart(cleanWhenRestart);
+        builder.dataFileMaxSize(dataFileMaxSize);
+        builder.offsetFileMaxSize(offsetFileMaxSize);
         builder.parser(UpstreamSegment.parser());
         builder.callBack(segmentParse);
 
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
new file mode 100644
index 0000000..d19be0e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class AgentDataMock {
+
+    private static final Logger logger = LoggerFactory.getLogger(AgentDataMock.class);
+
+    private static boolean IS_COMPLETED = false;
+
+    public static void main(String[] args) throws InterruptedException {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
+
+        RegisterMock registerMock = new RegisterMock();
+        registerMock.mock(channel);
+
+        StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();
+
+        UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
+        long startTimestamp = System.currentTimeMillis();
+
+        ConsumerMock consumerMock = new ConsumerMock();
+        UniqueId.Builder consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, true);
+
+        ProviderMock providerMock = new ProviderMock();
+        UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true);
+
+        streamObserver.onCompleted();
+        while (!IS_COMPLETED) {
+            TimeUnit.MILLISECONDS.sleep(500);
+        }
+    }
+
+    private static StreamObserver<UpstreamSegment> createStreamObserver() {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
+        TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(channel);
+        return stub.collect(new StreamObserver<Downstream>() {
+            @Override public void onNext(Downstream downstream) {
+            }
+
+            @Override public void onError(Throwable throwable) {
+            }
+
+            @Override public void onCompleted() {
+                IS_COMPLETED = true;
+            }
+        });
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java
new file mode 100644
index 0000000..9d33e91
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ConsumerMock {
+
+    void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder globalTraceId,
+        UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
+        UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+        upstreamSegment.addGlobalTraceIds(globalTraceId);
+        upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
+
+        streamObserver.onNext(upstreamSegment.build());
+    }
+
+    private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
+        TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+        segment.setTraceSegmentId(segmentId);
+        segment.setApplicationId(1);
+        segment.setApplicationInstanceId(1);
+        segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan2(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan2(startTimestamp, isPrepare));
+
+        return segment.build().toByteString();
+    }
+
+    private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.Http);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp);
+        span.setEndTime(startTimestamp + 2000);
+        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        if (isPrepare) {
+            span.setOperationName("/dubbox-case/case/dubbox-rest");
+        } else {
+            span.setOperationNameId(1);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(1);
+        span.setSpanType(SpanType.Local);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 100);
+        span.setEndTime(startTimestamp + 1900);
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.Local.do");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createMqEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(2);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.RocketMQ");
+        } else {
+            span.setOperationNameId(3);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(3);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(2);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        if (isPrepare) {
+            span.setPeer("172.25.0.4:20880");
+            span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        } else {
+            span.setOperationNameId(4);
+            span.setPeerId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createMqEntrySpan2(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(4);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.RocketMQ");
+        } else {
+            span.setOperationNameId(3);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createExitSpan2(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(5);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(4);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        if (isPrepare) {
+            span.setPeer("172.25.0.4:20880");
+            span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        } else {
+            span.setOperationNameId(4);
+            span.setPeerId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java
new file mode 100644
index 0000000..5b353c9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ProviderMock {
+
+    void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder globalTraceId,
+        UniqueId.Builder segmentId, UniqueId.Builder parentTraceSegmentId, long startTimestamp, boolean isPrepare) {
+        UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+        upstreamSegment.addGlobalTraceIds(globalTraceId);
+        upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+        streamObserver.onNext(upstreamSegment.build());
+    }
+
+    private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+        UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+        segment.setTraceSegmentId(segmentId);
+        segment.setApplicationId(2);
+        segment.setApplicationInstanceId(2);
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+        segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+
+        return segment.build().toByteString();
+    }
+
+    private TraceSegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        TraceSegmentReference.Builder reference = TraceSegmentReference.newBuilder();
+        reference.setParentTraceSegmentId(parentTraceSegmentId);
+        reference.setParentApplicationInstanceId(1);
+        reference.setParentSpanId(1);
+        reference.setEntryApplicationInstanceId(1);
+        reference.setRefType(RefType.CrossProcess);
+
+        if (isPrepare) {
+            reference.setParentServiceName("/dubbox-case/case/dubbox-rest");
+            reference.setNetworkAddress("172.25.0.4:20880");
+            reference.setEntryServiceName("/dubbox-case/case/dubbox-rest");
+        } else {
+            reference.setParentServiceId(1);
+            reference.setNetworkAddressId(2);
+            reference.setEntryServiceId(1);
+        }
+        return reference;
+    }
+
+    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(1);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.Database);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 510);
+        span.setEndTime(startTimestamp + 1490);
+        span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
+        span.setIsError(true);
+
+        if (isPrepare) {
+            span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
+            span.setPeer("localhost:27017");
+        } else {
+            span.setOperationNameId(5);
+            span.setPeerId(1);
+        }
+        return span;
+    }
+
+    private SpanObject.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp + 500);
+        span.setEndTime(startTimestamp + 1500);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        span.setIsError(false);
+        span.addRefs(createReference(uniqueId, isPrepare));
+
+        if (isPrepare) {
+            span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        } else {
+            span.setOperationNameId(6);
+        }
+        return span;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
new file mode 100644
index 0000000..3e9915a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import java.util.UUID;
+import java.util.concurrent.*;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.joda.time.DateTime;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class RegisterMock {
+
+    private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
+
+    private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
+    private InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
+    private ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
+
+    void mock(ManagedChannel channel) throws InterruptedException {
+        applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
+        instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
+        serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
+        registerConsumer();
+        registerProvider();
+    }
+
+    private void registerConsumer() throws InterruptedException {
+        Application.Builder application = Application.newBuilder();
+        application.setApplicationCode("dubbox-consumer");
+
+        ApplicationMapping applicationMapping;
+        do {
+            applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+            logger.debug("application id: {}", applicationMapping.getApplication().getValue());
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        while (applicationMapping.getApplication().getValue() == 0);
+
+        ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
+        instance.setApplicationId(applicationMapping.getApplication().getValue());
+        instance.setAgentUUID(UUID.randomUUID().toString());
+        instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
+
+        OSInfo.Builder osInfo = OSInfo.newBuilder();
+        osInfo.setHostname("pengys");
+        osInfo.setOsName("MacOS XX");
+        osInfo.setProcessNo(1001);
+        osInfo.addIpv4S("10.0.0.3");
+        osInfo.addIpv4S("10.0.0.4");
+        instance.setOsinfo(osInfo);
+
+        ApplicationInstanceMapping instanceMapping;
+        do {
+            instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+            logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        while (instanceMapping.getApplicationInstanceId() == 0);
+
+        ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
+        ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
+        serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
+        serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        serviceNameElement.setSrcSpanType(SpanType.Exit);
+        serviceNameCollection.addElements(serviceNameElement);
+
+//        registerServiceName(serviceNameCollection);
+
+//        heartBeatScheduled(instanceMapping.getApplicationInstanceId());
+    }
+
+    private void registerProvider() throws InterruptedException {
+        Application.Builder application = Application.newBuilder();
+        application.setApplicationCode("dubbox-provider");
+
+        ApplicationMapping applicationMapping;
+        do {
+            applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+            logger.debug("application id: {}", applicationMapping.getApplication().getValue());
+            Thread.sleep(20);
+        }
+        while (applicationMapping.getApplication().getValue() == 0);
+
+        ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
+        instance.setApplicationId(applicationMapping.getApplication().getValue());
+        instance.setAgentUUID(UUID.randomUUID().toString());
+        instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
+
+        OSInfo.Builder osInfo = OSInfo.newBuilder();
+        osInfo.setHostname("peng-yongsheng");
+        osInfo.setOsName("MacOS X");
+        osInfo.setProcessNo(1000);
+        osInfo.addIpv4S("10.0.0.1");
+        osInfo.addIpv4S("10.0.0.2");
+        instance.setOsinfo(osInfo);
+
+        ApplicationInstanceMapping instanceMapping;
+        do {
+            instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+            logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
+            Thread.sleep(20);
+        }
+        while (instanceMapping.getApplicationInstanceId() == 0);
+
+        ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
+        ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
+        serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
+        serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        serviceNameElement.setSrcSpanType(SpanType.Entry);
+        serviceNameCollection.addElements(serviceNameElement);
+
+//        registerServiceName(serviceNameCollection);
+
+//        heartBeatScheduled(instanceMapping.getApplicationInstanceId());
+    }
+
+    private void registerServiceName(ServiceNameCollection.Builder serviceNameCollection) throws InterruptedException {
+        ServiceNameMappingCollection serviceNameMappingCollection;
+        do {
+            logger.debug("register service name: {}", serviceNameCollection.getElements(0).getServiceName());
+            serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
+            logger.debug("service name mapping collection size: {}", serviceNameMappingCollection.getElementsCount());
+            if (serviceNameMappingCollection.getElementsCount() > 0) {
+                logger.debug("service id: {}", serviceNameMappingCollection.getElements(0).getServiceId());
+            }
+            Thread.sleep(20);
+        }
+        while (serviceNameMappingCollection.getElementsCount() == 0 || serviceNameMappingCollection.getElements(0).getServiceId() == 0);
+    }
+
+    private void heartBeatScheduled(int instanceId) {
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+            new RunnableWithExceptionProtection(() -> heartBeat(instanceId),
+                t -> logger.error("instance heart beat scheduled error.", t)), 4, 1, TimeUnit.SECONDS);
+    }
+
+    private void heartBeat(int instanceId) {
+        long now = System.currentTimeMillis();
+        logger.debug("instance heart beat, instance id: {}, time: {}", instanceId, now);
+        ApplicationInstanceHeartbeat.Builder heartbeat = ApplicationInstanceHeartbeat.newBuilder();
+        heartbeat.setApplicationInstanceId(instanceId);
+        heartbeat.setHeartbeatTime(now);
+        instanceDiscoveryServiceBlockingStub.heartbeat(heartbeat.build());
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
similarity index 51%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
index 2ce7c97..e64adff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
@@ -16,24 +16,24 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core;
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
 
 /**
  * @author peng-yongsheng
  */
-public class Const {
-    public static final int NONE = 0;
-    public static final String ID_SPLIT = "_";
-    public static final int NONE_SERVICE_ID = 1;
-    public static final int NONE_INSTANCE_ID = 1;
-    public static final int NONE_ENDPOINT_ID = 1;
-    public static final String NONE_ENDPOINT_NAME = "None";
-    public static final String USER_CODE = "User";
-    public static final String SEGMENT_SPAN_SPLIT = "S";
-    public static final String UNKNOWN = "Unknown";
-    public static final String EXCEPTION = "Exception";
-    public static final String EMPTY_STRING = "";
-    public static final String FILE_SUFFIX = "sw";
-    public static final int SPAN_TYPE_VIRTUAL = 9;
-    public static final String DOMAIN_OPERATION_NAME = "{domain}";
+public enum UniqueIdBuilder {
+    INSTANCE;
+
+    private AtomicLong idPart = new AtomicLong(1);
+
+    UniqueId.Builder create() {
+        UniqueId.Builder uniqueId = UniqueId.newBuilder();
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        return uniqueId;
+    }
 }
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
similarity index 70%
copy from oap-server/server-starter/src/main/resources/log4j2.xml
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
index 65c5203..6eb5b3f 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
@@ -17,20 +17,14 @@
   ~
   -->
 
-<Configuration status="INFO">
+<Configuration status="DEBUG">
     <Appenders>
         <Console name="Console" target="SYSTEM_OUT">
             <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
         </Console>
     </Appenders>
     <Loggers>
-        <logger name="org.eclipse.jetty" level="INFO"/>
-        <logger name="org.apache.zookeeper" level="INFO"/>
-        <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
-        <logger name="io.grpc.netty" level="INFO"/>
-        <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
-        <logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
-        <Root level="INFO">
+        <Root level="DEBUG">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index f7e244a..9290a93 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -53,6 +53,10 @@ receiver-register:
   default:
 receiver-trace:
   default:
+    bufferPath: ../buffer/  # Path to trace buffer files, suggest to use absolute path
+    bufferOffsetMaxFileSize: 100 # Unit is MB
+    bufferDataMaxFileSize: 500 # Unit is MB
+    bufferFileCleanWhenRestart: false
 receiver-jvm:
   default:
 service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 65c5203..2da2d13 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -27,10 +27,13 @@
         <logger name="org.eclipse.jetty" level="INFO"/>
         <logger name="org.apache.zookeeper" level="INFO"/>
         <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
+        <logger name="org.elasticsearch.client.RestClient" level="INFO"/>
         <logger name="io.grpc.netty" level="INFO"/>
+        <logger name="io.netty" level="INFO"/>
+        <logger name="org.apache.http" level="INFO"/>
         <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
         <logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
-        <Root level="INFO">
+        <Root level="DEBUG">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
index 1790854..13c6711 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
@@ -91,7 +91,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
 
         int id = (int)agg.getValue();
         if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
-            return 1;
+            return 0;
         } else {
             return id;
         }