You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by pe...@apache.org on 2018/09/09 07:30:46 UTC
[incubator-skywalking] branch master updated: Streaming analysis
topology test success. (#1646)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new bc57125 Streaming analysis topology test success. (#1646)
bc57125 is described below
commit bc571259b2bbf11487fa9974c919f5dfc60e0650
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sun Sep 9 15:30:43 2018 +0800
Streaming analysis topology test success. (#1646)
* Add generate indicator and dispatcher about service topology but not test.
* Delete call type from service relation.
* Streaming analysis topology test success.
* Fixed the check style error.
---
.../apache/skywalking/oap/server/core/Const.java | 1 -
.../skywalking/oap/server/core/CoreModule.java | 3 +
.../oap/server/core/CoreModuleProvider.java | 6 +
.../analysis/worker/IndicatorPersistentWorker.java | 12 ++
.../core/analysis/worker/IndicatorProcess.java | 3 +
.../server/core/cache/EndpointInventoryCache.java | 17 +--
.../core/cache/NetworkAddressInventoryCache.java | 23 +--
.../core/cache/ServiceInstanceInventoryCache.java | 30 +---
.../server/core/cache/ServiceInventoryCache.java | 33 ++--
.../register/worker/RegisterPersistentWorker.java | 2 +-
.../oap/server/core/storage/PersistenceTimer.java | 99 ++++++++++++
.../server/library/buffer/DataStreamReader.java | 20 ++-
.../oap/server/library/buffer/OffsetStream.java | 2 +-
.../library/buffer/BufferStreamTestCase.java | 3 +-
.../trace/provider/TraceModuleProvider.java | 17 ++-
.../trace/provider/TraceServiceModuleConfig.java} | 25 ++--
.../handler/TraceSegmentServiceHandler.java | 10 +-
.../trace/provider/parser/SegmentParse.java | 11 +-
.../SegmentStandardizationWorker.java | 26 ++--
.../server/receiver/trace/mock/AgentDataMock.java | 76 ++++++++++
.../server/receiver/trace/mock/ConsumerMock.java | 164 ++++++++++++++++++++
.../server/receiver/trace/mock/ProviderMock.java | 112 ++++++++++++++
.../server/receiver/trace/mock/RegisterMock.java | 166 +++++++++++++++++++++
.../receiver/trace/mock/UniqueIdBuilder.java} | 32 ++--
.../src/test}/resources/log4j2.xml | 10 +-
.../src/main/resources/application.yml | 4 +
.../server-starter/src/main/resources/log4j2.xml | 5 +-
.../plugin/elasticsearch/base/RegisterEsDAO.java | 2 +-
28 files changed, 765 insertions(+), 149 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index 2ce7c97..ca3583a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -33,7 +33,6 @@ public class Const {
public static final String UNKNOWN = "Unknown";
public static final String EXCEPTION = "Exception";
public static final String EMPTY_STRING = "";
- public static final String FILE_SUFFIX = "sw";
public static final int SPAN_TYPE_VIRTUAL = 9;
public static final String DOMAIN_OPERATION_NAME = "{domain}";
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 1052187..2d12a56 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.cache.*;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
@@ -42,6 +43,8 @@ public class CoreModule extends ModuleDefine {
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
+ classes.add(IComponentLibraryCatalogService.class);
+
addServerInterface(classes);
addReceiverInterface(classes);
addInsideService(classes);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index d01de7d..dc32872 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Indic
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
@@ -30,6 +31,7 @@ import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.library.module.*;
@@ -85,6 +87,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
+ this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
+
this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
@@ -136,6 +140,8 @@ public class CoreModuleProvider extends ModuleProvider {
RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
+
+ PersistenceTimer.INSTANCE.start(getManager());
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 3a5bbe5..bca9752 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -73,6 +73,18 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
cacheData(input);
}
+ public boolean flushAndSwitch() {
+ boolean isSwitch;
+ try {
+ if (isSwitch = getCache().trySwitchPointer()) {
+ getCache().switchPointer();
+ }
+ } finally {
+ getCache().trySwitchPointerFinally();
+ }
+ return isSwitch;
+ }
+
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
try {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 12cfce2..b87a83c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
+import lombok.Getter;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.*;
@@ -33,6 +34,7 @@ public enum IndicatorProcess {
INSTANCE;
private Map<Class<? extends Indicator>, IndicatorAggregateWorker> entryWorkers = new HashMap<>();
+ @Getter private List<IndicatorPersistentWorker> persistentWorkers = new ArrayList<>();
public void in(Indicator indicator) {
entryWorkers.get(indicator.getClass()).in(indicator);
@@ -56,6 +58,7 @@ public enum IndicatorProcess {
IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO, alarmNotifyWorker);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ persistentWorkers.add(persistentWorker);
IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, persistentWorker);
WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
index 8cce526..5e7ff65 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointInventoryCache.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
+import java.util.Objects;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -56,14 +57,9 @@ public class EndpointInventoryCache implements Service {
public int getEndpointId(int serviceId, String endpointName) {
String id = EndpointInventory.buildId(serviceId, endpointName);
- int endpointId = Const.NONE;
- try {
- endpointId = endpointNameCache.get(id, () -> getCacheDAO().getEndpointId(serviceId, endpointName));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ Integer endpointId = endpointNameCache.getIfPresent(id);
- if (endpointId == Const.NONE) {
+ if (Objects.isNull(endpointId) || endpointId == Const.NONE) {
endpointId = getCacheDAO().getEndpointId(serviceId, endpointName);
if (endpointId != Const.NONE) {
endpointNameCache.put(id, endpointId);
@@ -73,12 +69,7 @@ public class EndpointInventoryCache implements Service {
}
public EndpointInventory get(int endpointId) {
- EndpointInventory endpointInventory = null;
- try {
- endpointInventory = endpointIdCache.get(endpointId, () -> getCacheDAO().get(endpointId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ EndpointInventory endpointInventory = endpointIdCache.getIfPresent(endpointId);
if (isNull(endpointInventory)) {
endpointInventory = getCacheDAO().get(endpointId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
index 91ed3d9..bbcd66e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/NetworkAddressInventoryCache.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
+import java.util.Objects;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -54,30 +55,20 @@ public class NetworkAddressInventoryCache implements Service {
}
public int getAddressId(String networkAddress) {
- int addressId = Const.NONE;
- try {
- addressId = networkAddressCache.get(NetworkAddressInventory.buildId(networkAddress), () -> getCacheDAO().getAddressId(networkAddress));
+ Integer addressId = networkAddressCache.getIfPresent(NetworkAddressInventory.buildId(networkAddress));
- if (addressId == Const.NONE) {
- addressId = getCacheDAO().getAddressId(networkAddress);
- if (addressId != Const.NONE) {
- networkAddressCache.put(NetworkAddressInventory.buildId(networkAddress), addressId);
- }
+ if (Objects.isNull(addressId) || addressId == Const.NONE) {
+ addressId = getCacheDAO().getAddressId(networkAddress);
+ if (addressId != Const.NONE) {
+ networkAddressCache.put(NetworkAddressInventory.buildId(networkAddress), addressId);
}
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
}
return addressId;
}
public NetworkAddressInventory get(int addressId) {
- NetworkAddressInventory networkAddress = null;
- try {
- networkAddress = addressIdCache.get(addressId, () -> getCacheDAO().get(addressId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ NetworkAddressInventory networkAddress = addressIdCache.getIfPresent(addressId);
if (isNull(networkAddress)) {
networkAddress = getCacheDAO().get(addressId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
index 0f07b37..777c574 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInstanceInventoryCache.java
@@ -20,8 +20,9 @@ package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
import java.util.Objects;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
@@ -50,18 +51,13 @@ public class ServiceInstanceInventoryCache implements Service {
private IServiceInstanceInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
- this.cacheDAO = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
+ this.cacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
}
return this.cacheDAO;
}
public ServiceInstanceInventory get(int serviceInstanceId) {
- ServiceInstanceInventory serviceInstanceInventory = null;
- try {
- serviceInstanceInventory = serviceInstanceIdCache.get(serviceInstanceId, () -> getCacheDAO().get(serviceInstanceId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ ServiceInstanceInventory serviceInstanceInventory = serviceInstanceIdCache.getIfPresent(serviceInstanceId);
if (Objects.isNull(serviceInstanceInventory)) {
serviceInstanceInventory = getCacheDAO().get(serviceInstanceId);
@@ -73,14 +69,9 @@ public class ServiceInstanceInventoryCache implements Service {
}
public int getServiceInstanceId(int serviceId, String serviceInstanceName) {
- int serviceInstanceId = Const.NONE;
- try {
- serviceInstanceId = serviceInstanceNameCache.get(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), () -> getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ Integer serviceInstanceId = serviceInstanceNameCache.getIfPresent(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName));
- if (serviceInstanceId == Const.NONE) {
+ if (Objects.isNull(serviceInstanceId) || serviceInstanceId == Const.NONE) {
serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName);
if (serviceId != Const.NONE) {
serviceInstanceNameCache.put(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), serviceInstanceId);
@@ -90,14 +81,9 @@ public class ServiceInstanceInventoryCache implements Service {
}
public int getServiceInstanceId(int serviceId, int addressId) {
- int serviceInstanceId = Const.NONE;
- try {
- serviceInstanceId = addressIdCache.get(ServiceInstanceInventory.buildId(serviceId, addressId), () -> getCacheDAO().getServiceInstanceId(serviceId, addressId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ Integer serviceInstanceId = addressIdCache.getIfPresent(ServiceInstanceInventory.buildId(serviceId, addressId));
- if (serviceInstanceId == Const.NONE) {
+ if (Objects.isNull(serviceInstanceId) || serviceInstanceId == Const.NONE) {
serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, addressId);
if (serviceId != Const.NONE) {
addressIdCache.put(ServiceInstanceInventory.buildId(serviceId, addressId), serviceInstanceId);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
index 4213aca..8571a63 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ServiceInventoryCache.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
+import java.util.Objects;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
@@ -54,14 +55,9 @@ public class ServiceInventoryCache implements Service {
}
public int getServiceId(String serviceName) {
- int serviceId = Const.NONE;
- try {
- serviceId = serviceNameCache.get(ServiceInventory.buildId(serviceName), () -> getCacheDAO().getServiceId(serviceName));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ Integer serviceId = serviceNameCache.getIfPresent(ServiceInventory.buildId(serviceName));
- if (serviceId == Const.NONE) {
+ if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
serviceId = getCacheDAO().getServiceId(serviceName);
if (serviceId != Const.NONE) {
serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
@@ -71,14 +67,9 @@ public class ServiceInventoryCache implements Service {
}
public int getServiceId(int addressId) {
- int serviceId = Const.NONE;
- try {
- serviceId = addressIdCache.get(ServiceInventory.buildId(addressId), () -> getCacheDAO().getServiceId(addressId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ Integer serviceId = addressIdCache.getIfPresent(ServiceInventory.buildId(addressId));
- if (serviceId == Const.NONE) {
+ if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
serviceId = getCacheDAO().getServiceId(addressId);
if (serviceId != Const.NONE) {
addressIdCache.put(ServiceInventory.buildId(addressId), serviceId);
@@ -88,12 +79,7 @@ public class ServiceInventoryCache implements Service {
}
public ServiceInventory get(int serviceId) {
- ServiceInventory serviceInventory = null;
- try {
- serviceInventory = serviceIdCache.get(serviceId, () -> getCacheDAO().get(serviceId));
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
+ ServiceInventory serviceInventory = serviceIdCache.getIfPresent(serviceId);
if (isNull(serviceInventory)) {
serviceInventory = getCacheDAO().get(serviceId);
@@ -101,6 +87,13 @@ public class ServiceInventoryCache implements Service {
serviceIdCache.put(serviceId, serviceInventory);
}
}
+
+ if (logger.isDebugEnabled()) {
+ if (Objects.isNull(serviceInventory)) {
+ logger.debug("service id {} not find in cache.", serviceId);
+ }
+ }
+
return serviceInventory;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 2fb8753..eb94d66 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -66,7 +66,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
registerDAO.forceUpdate(modelName, newSource);
} else {
int sequence = registerDAO.max(modelName);
- source.setSequence(sequence);
+ source.setSequence(sequence + 1);
registerDAO.forceInsert(modelName, source);
}
} catch (Throwable t) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
new file mode 100644
index 0000000..7271f04
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.*;
+import java.util.concurrent.*;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum PersistenceTimer {
+ INSTANCE;
+
+ private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
+
+ private Boolean isStarted = false;
+ private final Boolean debug;
+
+ PersistenceTimer() {
+ this.debug = System.getProperty("debug") != null;
+ }
+
+ public void start(ModuleManager moduleManager) {
+ logger.info("persistence timer start");
+ //TODO timer value config
+// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
+ final long timeInterval = 3;
+ IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
+
+ if (!isStarted) {
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
+ t -> logger.error("Extract data and save failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
+
+ this.isStarted = true;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void extractDataAndSave(IBatchDAO batchDAO) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Extract data and save");
+ }
+
+ long startTime = System.currentTimeMillis();
+ try {
+ List batchAllCollection = new LinkedList();
+ IndicatorProcess.INSTANCE.getPersistentWorkers().forEach(worker -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data and save", worker.getClass().getName());
+ }
+
+ if (worker.flushAndSwitch()) {
+ List<?> batchCollection = worker.buildBatchCollection();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+ }
+ batchAllCollection.addAll(batchCollection);
+ }
+ });
+
+ if (debug) {
+ logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ }
+ batchDAO.batchPersistence(batchAllCollection);
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ if (logger.isDebugEnabled()) {
+ logger.debug("persistence data save finish");
+ }
+ }
+
+ if (debug) {
+ logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ }
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index 049f863..79b2c8e 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -98,6 +98,10 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
}
private void read() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Read buffer data");
+ }
+
try {
if (readOffset.getOffset() == readingFile.length() && !readOffset.isCurrentWriteFile()) {
FileUtils.forceDelete(readingFile);
@@ -108,7 +112,19 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
if (messageType != null) {
- callBack.call(messageType);
+ int i = 0;
+ while (!callBack.call(messageType)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ }
+
+ i++;
+ if (i == 10) {
+ break;
+ }
+ }
final int serialized = messageType.getSerializedSize();
final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
readOffset.setOffset(readOffset.getOffset() + offset);
@@ -120,6 +136,6 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
}
public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
- void call(MESSAGE_TYPE message);
+ boolean call(MESSAGE_TYPE message);
}
}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
index 9fb4ed5..a20f7c9 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
@@ -84,8 +84,8 @@ class OffsetStream {
void flush() {
try {
String offsetRecord = offset.serialize();
- logger.debug("flush offset, record: {}", offsetRecord);
if (!lastOffsetRecord.equals(offsetRecord)) {
+ logger.debug("flush offset, record: {}", offsetRecord);
if (offsetFile.length() >= FileUtils.ONE_MB * offsetFileMaxSize) {
nextFile();
}
diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
index 53eca71..9752de7 100644
--- a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -67,8 +67,9 @@ public class BufferStreamTestCase {
private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {
- @Override public void call(TraceSegmentObject message) {
+ @Override public boolean call(TraceSegmentObject message) {
logger.info("segment parse: {}", message.getSpans(0).getSpanId());
+ return true;
}
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 3adfd1f..266e85d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -24,15 +24,22 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
/**
* @author peng-yongsheng
*/
public class TraceModuleProvider extends ModuleProvider {
+ private final TraceServiceModuleConfig moduleConfig;
+
+ public TraceModuleProvider() {
+ this.moduleConfig = new TraceServiceModuleConfig();
+ }
+
@Override public String name() {
return "default";
}
@@ -42,7 +49,7 @@ public class TraceModuleProvider extends ModuleProvider {
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
- return null;
+ return moduleConfig;
}
@Override public void prepare() {
@@ -56,7 +63,11 @@ public class TraceModuleProvider extends ModuleProvider {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
try {
- grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(getManager(), listenerManager));
+ SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager);
+ grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse));
+
+ SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+ segmentParse.setStandardizationWorker(standardizationWorker);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
similarity index 51%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
index 2ce7c97..578cdd0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java
@@ -16,24 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.core;
+package org.apache.skywalking.oap.server.receiver.trace.provider;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
-public class Const {
- public static final int NONE = 0;
- public static final String ID_SPLIT = "_";
- public static final int NONE_SERVICE_ID = 1;
- public static final int NONE_INSTANCE_ID = 1;
- public static final int NONE_ENDPOINT_ID = 1;
- public static final String NONE_ENDPOINT_NAME = "None";
- public static final String USER_CODE = "User";
- public static final String SEGMENT_SPAN_SPLIT = "S";
- public static final String UNKNOWN = "Unknown";
- public static final String EXCEPTION = "Exception";
- public static final String EMPTY_STRING = "";
- public static final String FILE_SUFFIX = "sw";
- public static final int SPAN_TYPE_VIRTUAL = 9;
- public static final String DOMAIN_OPERATION_NAME = "{domain}";
+public class TraceServiceModuleConfig extends ModuleConfig {
+ @Setter @Getter private String bufferPath;
+ @Setter @Getter private int bufferOffsetMaxFileSize;
+ @Setter @Getter private int bufferDataMaxFileSize;
+ @Setter @Getter private boolean bufferFileCleanWhenRestart;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
index 6b153e4..908395a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/TraceSegmentServiceHandler.java
@@ -19,11 +19,9 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.handler;
import io.grpc.stub.StreamObserver;
-import java.io.IOException;
import org.apache.skywalking.apm.network.language.agent.*;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.slf4j.*;
/**
@@ -33,12 +31,12 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
- private final SegmentParse segmentParse;
private final Boolean debug;
+ private final SegmentParse segmentParse;
- public TraceSegmentServiceHandler(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) throws IOException {
- this.segmentParse = new SegmentParse(moduleManager, listenerManager);
+ public TraceSegmentServiceHandler(SegmentParse segmentParse) {
this.debug = System.getProperty("debug") != null;
+ this.segmentParse = segmentParse;
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 01bede2..78c6a78 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -19,8 +19,8 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
import java.util.*;
+import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -41,20 +41,19 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
- private final SegmentStandardizationWorker standardizationWorker;
+ @Setter private SegmentStandardizationWorker standardizationWorker;
- public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) throws IOException {
+ public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
this.segmentCoreInfo = new SegmentCoreInfo();
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
- this.standardizationWorker = new SegmentStandardizationWorker(moduleManager, listenerManager,this);
}
- @Override public void call(UpstreamSegment segment) {
- parse(segment, Source.Buffer);
+ @Override public boolean call(UpstreamSegment segment) {
+ return parse(segment, Source.Buffer);
}
public boolean parse(UpstreamSegment segment, Source source) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index d0d617c..d696a36 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -20,14 +20,12 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard
import java.io.IOException;
import java.util.*;
-
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.slf4j.*;
/**
@@ -37,20 +35,18 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
- private final DataCarrier<SegmentStandardization> dataCarrier;
private final BufferStream<UpstreamSegment> stream;
- public SegmentStandardizationWorker(ModuleManager moduleManager,
- SegmentParserListenerManager listenerManager, SegmentParse segmentParse) throws IOException {
- super(9999);
- this.dataCarrier = new DataCarrier<>(1, 1024);
- this.dataCarrier.consume(new Consumer(this), 1);
-
- String directory = "/Users/pengys5/code/sky-walking/buffer-test";
- BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(directory);
-// builder.cleanWhenRestart(true);
- builder.dataFileMaxSize(50);
- builder.offsetFileMaxSize(10);
+ public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
+ int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
+ super(Integer.MAX_VALUE);
+ DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1, 1024);
+ dataCarrier.consume(new Consumer(this), 1);
+
+ BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
+ builder.cleanWhenRestart(cleanWhenRestart);
+ builder.dataFileMaxSize(dataFileMaxSize);
+ builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParse);
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
new file mode 100644
index 0000000..d19be0e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class AgentDataMock {
+
+ private static final Logger logger = LoggerFactory.getLogger(AgentDataMock.class);
+
+ private static boolean IS_COMPLETED = false;
+
+ public static void main(String[] args) throws InterruptedException {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
+
+ RegisterMock registerMock = new RegisterMock();
+ registerMock.mock(channel);
+
+ StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();
+
+ UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
+ long startTimestamp = System.currentTimeMillis();
+
+ ConsumerMock consumerMock = new ConsumerMock();
+ UniqueId.Builder consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
+ consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, true);
+
+ ProviderMock providerMock = new ProviderMock();
+ UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
+ providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true);
+
+ streamObserver.onCompleted();
+ while (!IS_COMPLETED) {
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ }
+
+ private static StreamObserver<UpstreamSegment> createStreamObserver() {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
+ TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(channel);
+ return stub.collect(new StreamObserver<Downstream>() {
+ @Override public void onNext(Downstream downstream) {
+ }
+
+ @Override public void onError(Throwable throwable) {
+ }
+
+ @Override public void onCompleted() {
+ IS_COMPLETED = true;
+ }
+ });
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java
new file mode 100644
index 0000000..9d33e91
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ConsumerMock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ConsumerMock {
+
+ void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder globalTraceId,
+ UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
+ UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+ upstreamSegment.addGlobalTraceIds(globalTraceId);
+ upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
+
+ streamObserver.onNext(upstreamSegment.build());
+ }
+
+ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
+ TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+ segment.setTraceSegmentId(segmentId);
+ segment.setApplicationId(1);
+ segment.setApplicationInstanceId(1);
+ segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+ segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+ segment.addSpans(createMqEntrySpan(startTimestamp, isPrepare));
+ segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+ segment.addSpans(createMqEntrySpan2(startTimestamp, isPrepare));
+ segment.addSpans(createExitSpan2(startTimestamp, isPrepare));
+
+ return segment.build().toByteString();
+ }
+
+ private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(0);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.Http);
+ span.setParentSpanId(-1);
+ span.setStartTime(startTimestamp);
+ span.setEndTime(startTimestamp + 2000);
+ span.setComponentId(ComponentsDefine.TOMCAT.getId());
+ if (isPrepare) {
+ span.setOperationName("/dubbox-case/case/dubbox-rest");
+ } else {
+ span.setOperationNameId(1);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObject.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(1);
+ span.setSpanType(SpanType.Local);
+ span.setParentSpanId(0);
+ span.setStartTime(startTimestamp + 100);
+ span.setEndTime(startTimestamp + 1900);
+ if (isPrepare) {
+ span.setOperationName("org.apache.skywalking.Local.do");
+ } else {
+ span.setOperationNameId(2);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObject.Builder createMqEntrySpan(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(2);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.MQ);
+ span.setParentSpanId(1);
+ span.setStartTime(startTimestamp + 110);
+ span.setEndTime(startTimestamp + 1800);
+ span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+ if (isPrepare) {
+ span.setOperationName("org.apache.skywalking.RocketMQ");
+ } else {
+ span.setOperationNameId(3);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(3);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.RPCFramework);
+ span.setParentSpanId(2);
+ span.setStartTime(startTimestamp + 120);
+ span.setEndTime(startTimestamp + 1780);
+ span.setComponentId(ComponentsDefine.DUBBO.getId());
+ if (isPrepare) {
+ span.setPeer("172.25.0.4:20880");
+ span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+ } else {
+ span.setOperationNameId(4);
+ span.setPeerId(2);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObject.Builder createMqEntrySpan2(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(4);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.MQ);
+ span.setParentSpanId(1);
+ span.setStartTime(startTimestamp + 110);
+ span.setEndTime(startTimestamp + 1800);
+ span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+ if (isPrepare) {
+ span.setOperationName("org.apache.skywalking.RocketMQ");
+ } else {
+ span.setOperationNameId(3);
+ }
+ span.setIsError(false);
+ return span;
+ }
+
+ private SpanObject.Builder createExitSpan2(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(5);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.RPCFramework);
+ span.setParentSpanId(4);
+ span.setStartTime(startTimestamp + 120);
+ span.setEndTime(startTimestamp + 1780);
+ span.setComponentId(ComponentsDefine.DUBBO.getId());
+ if (isPrepare) {
+ span.setPeer("172.25.0.4:20880");
+ span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+ } else {
+ span.setOperationNameId(4);
+ span.setPeerId(2);
+ }
+ span.setIsError(false);
+ return span;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java
new file mode 100644
index 0000000..5b353c9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ProviderMock.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ProviderMock {
+
+ void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder globalTraceId,
+ UniqueId.Builder segmentId, UniqueId.Builder parentTraceSegmentId, long startTimestamp, boolean isPrepare) {
+ UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+ upstreamSegment.addGlobalTraceIds(globalTraceId);
+ upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+ streamObserver.onNext(upstreamSegment.build());
+ }
+
+ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+ UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+ segment.setTraceSegmentId(segmentId);
+ segment.setApplicationId(2);
+ segment.setApplicationInstanceId(2);
+ segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+ segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+
+ return segment.build().toByteString();
+ }
+
+ private TraceSegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+ TraceSegmentReference.Builder reference = TraceSegmentReference.newBuilder();
+ reference.setParentTraceSegmentId(parentTraceSegmentId);
+ reference.setParentApplicationInstanceId(1);
+ reference.setParentSpanId(1);
+ reference.setEntryApplicationInstanceId(1);
+ reference.setRefType(RefType.CrossProcess);
+
+ if (isPrepare) {
+ reference.setParentServiceName("/dubbox-case/case/dubbox-rest");
+ reference.setNetworkAddress("172.25.0.4:20880");
+ reference.setEntryServiceName("/dubbox-case/case/dubbox-rest");
+ } else {
+ reference.setParentServiceId(1);
+ reference.setNetworkAddressId(2);
+ reference.setEntryServiceId(1);
+ }
+ return reference;
+ }
+
+ private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(1);
+ span.setSpanType(SpanType.Exit);
+ span.setSpanLayer(SpanLayer.Database);
+ span.setParentSpanId(0);
+ span.setStartTime(startTimestamp + 510);
+ span.setEndTime(startTimestamp + 1490);
+ span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
+ span.setIsError(true);
+
+ if (isPrepare) {
+ span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
+ span.setPeer("localhost:27017");
+ } else {
+ span.setOperationNameId(5);
+ span.setPeerId(1);
+ }
+ return span;
+ }
+
+ private SpanObject.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+ SpanObject.Builder span = SpanObject.newBuilder();
+ span.setSpanId(0);
+ span.setSpanType(SpanType.Entry);
+ span.setSpanLayer(SpanLayer.RPCFramework);
+ span.setParentSpanId(-1);
+ span.setStartTime(startTimestamp + 500);
+ span.setEndTime(startTimestamp + 1500);
+ span.setComponentId(ComponentsDefine.DUBBO.getId());
+ span.setIsError(false);
+ span.addRefs(createReference(uniqueId, isPrepare));
+
+ if (isPrepare) {
+ span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+ } else {
+ span.setOperationNameId(6);
+ }
+ return span;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
new file mode 100644
index 0000000..3e9915a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import java.util.UUID;
+import java.util.concurrent.*;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.joda.time.DateTime;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class RegisterMock {
+
+ private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
+
+ private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
+ private InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
+ private ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
+
+ void mock(ManagedChannel channel) throws InterruptedException {
+ applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
+ instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
+ serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
+ registerConsumer();
+ registerProvider();
+ }
+
+ private void registerConsumer() throws InterruptedException {
+ Application.Builder application = Application.newBuilder();
+ application.setApplicationCode("dubbox-consumer");
+
+ ApplicationMapping applicationMapping;
+ do {
+ applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+ logger.debug("application id: {}", applicationMapping.getApplication().getValue());
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+ while (applicationMapping.getApplication().getValue() == 0);
+
+ ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
+ instance.setApplicationId(applicationMapping.getApplication().getValue());
+ instance.setAgentUUID(UUID.randomUUID().toString());
+ instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
+
+ OSInfo.Builder osInfo = OSInfo.newBuilder();
+ osInfo.setHostname("pengys");
+ osInfo.setOsName("MacOS XX");
+ osInfo.setProcessNo(1001);
+ osInfo.addIpv4S("10.0.0.3");
+ osInfo.addIpv4S("10.0.0.4");
+ instance.setOsinfo(osInfo);
+
+ ApplicationInstanceMapping instanceMapping;
+ do {
+ instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+ logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+ while (instanceMapping.getApplicationInstanceId() == 0);
+
+ ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
+ ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
+ serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
+ serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+ serviceNameElement.setSrcSpanType(SpanType.Exit);
+ serviceNameCollection.addElements(serviceNameElement);
+
+// registerServiceName(serviceNameCollection);
+
+// heartBeatScheduled(instanceMapping.getApplicationInstanceId());
+ }
+
+ private void registerProvider() throws InterruptedException {
+ Application.Builder application = Application.newBuilder();
+ application.setApplicationCode("dubbox-provider");
+
+ ApplicationMapping applicationMapping;
+ do {
+ applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
+ logger.debug("application id: {}", applicationMapping.getApplication().getValue());
+ Thread.sleep(20);
+ }
+ while (applicationMapping.getApplication().getValue() == 0);
+
+ ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
+ instance.setApplicationId(applicationMapping.getApplication().getValue());
+ instance.setAgentUUID(UUID.randomUUID().toString());
+ instance.setRegisterTime(new DateTime("2017-01-01T00:01:01.001").getMillis());
+
+ OSInfo.Builder osInfo = OSInfo.newBuilder();
+ osInfo.setHostname("peng-yongsheng");
+ osInfo.setOsName("MacOS X");
+ osInfo.setProcessNo(1000);
+ osInfo.addIpv4S("10.0.0.1");
+ osInfo.addIpv4S("10.0.0.2");
+ instance.setOsinfo(osInfo);
+
+ ApplicationInstanceMapping instanceMapping;
+ do {
+ instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
+ logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
+ Thread.sleep(20);
+ }
+ while (instanceMapping.getApplicationInstanceId() == 0);
+
+ ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
+ ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
+ serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
+ serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+ serviceNameElement.setSrcSpanType(SpanType.Entry);
+ serviceNameCollection.addElements(serviceNameElement);
+
+// registerServiceName(serviceNameCollection);
+
+// heartBeatScheduled(instanceMapping.getApplicationInstanceId());
+ }
+
+ private void registerServiceName(ServiceNameCollection.Builder serviceNameCollection) throws InterruptedException {
+ ServiceNameMappingCollection serviceNameMappingCollection;
+ do {
+ logger.debug("register service name: {}", serviceNameCollection.getElements(0).getServiceName());
+ serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
+ logger.debug("service name mapping collection size: {}", serviceNameMappingCollection.getElementsCount());
+ if (serviceNameMappingCollection.getElementsCount() > 0) {
+ logger.debug("service id: {}", serviceNameMappingCollection.getElements(0).getServiceId());
+ }
+ Thread.sleep(20);
+ }
+ while (serviceNameMappingCollection.getElementsCount() == 0 || serviceNameMappingCollection.getElements(0).getServiceId() == 0);
+ }
+
+ private void heartBeatScheduled(int instanceId) {
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(() -> heartBeat(instanceId),
+ t -> logger.error("instance heart beat scheduled error.", t)), 4, 1, TimeUnit.SECONDS);
+ }
+
+ private void heartBeat(int instanceId) {
+ long now = System.currentTimeMillis();
+ logger.debug("instance heart beat, instance id: {}, time: {}", instanceId, now);
+ ApplicationInstanceHeartbeat.Builder heartbeat = ApplicationInstanceHeartbeat.newBuilder();
+ heartbeat.setApplicationInstanceId(instanceId);
+ heartbeat.setHeartbeatTime(now);
+ instanceDiscoveryServiceBlockingStub.heartbeat(heartbeat.build());
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
similarity index 51%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
index 2ce7c97..e64adff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
@@ -16,24 +16,24 @@
*
*/
-package org.apache.skywalking.oap.server.core;
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
/**
* @author peng-yongsheng
*/
-public class Const {
- public static final int NONE = 0;
- public static final String ID_SPLIT = "_";
- public static final int NONE_SERVICE_ID = 1;
- public static final int NONE_INSTANCE_ID = 1;
- public static final int NONE_ENDPOINT_ID = 1;
- public static final String NONE_ENDPOINT_NAME = "None";
- public static final String USER_CODE = "User";
- public static final String SEGMENT_SPAN_SPLIT = "S";
- public static final String UNKNOWN = "Unknown";
- public static final String EXCEPTION = "Exception";
- public static final String EMPTY_STRING = "";
- public static final String FILE_SUFFIX = "sw";
- public static final int SPAN_TYPE_VIRTUAL = 9;
- public static final String DOMAIN_OPERATION_NAME = "{domain}";
+public enum UniqueIdBuilder {
+ INSTANCE;
+
+ private AtomicLong idPart = new AtomicLong(1);
+
+ UniqueId.Builder create() {
+ UniqueId.Builder uniqueId = UniqueId.newBuilder();
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ uniqueId.addIdParts(idPart.getAndIncrement());
+ return uniqueId;
+ }
}
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
similarity index 70%
copy from oap-server/server-starter/src/main/resources/log4j2.xml
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
index 65c5203..6eb5b3f 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/resources/log4j2.xml
@@ -17,20 +17,14 @@
~
-->
-<Configuration status="INFO">
+<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
- <logger name="org.eclipse.jetty" level="INFO"/>
- <logger name="org.apache.zookeeper" level="INFO"/>
- <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
- <logger name="io.grpc.netty" level="INFO"/>
- <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
- <logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
- <Root level="INFO">
+ <Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index f7e244a..9290a93 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -53,6 +53,10 @@ receiver-register:
default:
receiver-trace:
default:
+ bufferPath: ../buffer/ # Path to trace buffer files, suggest to use absolute path
+ bufferOffsetMaxFileSize: 100 # Unit is MB
+ bufferDataMaxFileSize: 500 # Unit is MB
+ bufferFileCleanWhenRestart: false
receiver-jvm:
default:
service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 65c5203..2da2d13 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -27,10 +27,13 @@
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
+ <logger name="org.elasticsearch.client.RestClient" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
+ <logger name="io.netty" level="INFO"/>
+ <logger name="org.apache.http" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
- <Root level="INFO">
+ <Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
index 1790854..13c6711 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
@@ -91,7 +91,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
- return 1;
+ return 0;
} else {
return id;
}