You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/26 02:58:13 UTC
[incubator-skywalking] branch mesh-receiver updated: Add codes for
debug.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch mesh-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/mesh-receiver by this push:
new afadbf9 Add codes for debug.
afadbf9 is described below
commit afadbf9816aaf73d8f95f437db428244914da848
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Oct 26 10:58:00 2018 +0800
Add codes for debug.
---
.../apm/commons/datacarrier/DataCarrier.java | 10 +++++++--
.../commons/datacarrier/consumer/ConsumerPool.java | 8 +++----
.../datacarrier/consumer/ConsumerPoolTest.java | 4 ++--
.../analysis/indicator/ThermodynamicIndicator.java | 17 ++++++++++++--
.../analysis/worker/IndicatorAggregateWorker.java | 5 +++--
.../analysis/worker/IndicatorPersistentWorker.java | 2 +-
.../core/analysis/worker/IndicatorProcess.java | 2 +-
.../core/remote/client/GRPCRemoteClient.java | 2 +-
.../core/storage/ttl/DataTTLKeeperTimer.java | 26 +++++++++++++---------
.../receiver/mesh/MeshDataBufferFileCache.java | 2 +-
.../receiver/mesh/TelemetryDataDispatcher.java | 16 ++++++++++---
11 files changed, 65 insertions(+), 29 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index d53fda5..76ad609 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -33,8 +33,14 @@ public class DataCarrier<T> {
private final int channelSize;
private Channels<T> channels;
private ConsumerPool<T> consumerPool;
+ private String name;
public DataCarrier(int channelSize, int bufferSize) {
+ this("default", channelSize, bufferSize);
+ }
+
+ public DataCarrier(String name, int channelSize, int bufferSize) {
+ this.name = name;
this.bufferSize = bufferSize;
this.channelSize = channelSize;
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
@@ -92,7 +98,7 @@ public class DataCarrier<T> {
if (consumerPool != null) {
consumerPool.close();
}
- consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num, consumeCycle);
+ consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
return this;
}
@@ -119,7 +125,7 @@ public class DataCarrier<T> {
if (consumerPool != null) {
consumerPool.close();
}
- consumerPool = new ConsumerPool<T>(this.channels, consumer, num, consumeCycle);
+ consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
consumerPool.begin();
return this;
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
index 814fdf8..7ba0620 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
@@ -32,19 +32,19 @@ public class ConsumerPool<T> {
private Channels<T> channels;
private ReentrantLock lock;
- public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
+ public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
+ consumerThreads[i] = new ConsumerThread("DataCarrier."+ name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
- public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+ public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype, consumeCycle);
+ consumerThreads[i] = new ConsumerThread("DataCarrier."+ name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
index 18eab0e..885a5bf 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
@@ -34,7 +34,7 @@ public class ConsumerPoolTest {
@Test
public void testBeginConsumerPool() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
- ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
+ ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
@@ -46,7 +46,7 @@ public class ConsumerPoolTest {
@Test
public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
- ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
+ ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin();
Thread.sleep(5000);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
index 23ded0f..cef4342 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
@@ -19,9 +19,12 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Thermodynamic indicator represents the calculator for heat map.
@@ -35,6 +38,9 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*/
@IndicatorOperator
public abstract class ThermodynamicIndicator extends Indicator {
+ private static final Logger logger = LoggerFactory.getLogger(ThermodynamicIndicator.class);
+ private static AtomicInteger INDEX = new AtomicInteger(0);
+
public static final String DETAIL_GROUP = "detail_group";
public static final String STEP = "step";
public static final String NUM_OF_STEPS = "num_of_steps";
@@ -57,6 +63,7 @@ public abstract class ThermodynamicIndicator extends Indicator {
*/
@Entrance
public final void combine(@SourceFrom int value, @Arg int step, @Arg int maxNumOfSteps) {
+ logger.info("ThermodynamicIndicator get " + INDEX.addAndGet(1));
if (this.step == 0) {
this.step = step;
}
@@ -86,18 +93,24 @@ public abstract class ThermodynamicIndicator extends Indicator {
ThermodynamicIndicator thermodynamicIndicator = (ThermodynamicIndicator)indicator;
this.indexCheckAndInit();
thermodynamicIndicator.indexCheckAndInit();
+ final ThermodynamicIndicator self = this;
thermodynamicIndicator.detailIndex.forEach((key, element) -> {
- IntKeyLongValue existingElement = this.detailIndex.get(key);
+ IntKeyLongValue existingElement = self.detailIndex.get(key);
+ logger.info("prepare merging id =" + key + ", value=" + element.getValue());
if (existingElement == null) {
existingElement = new IntKeyLongValue();
existingElement.setKey(key);
existingElement.setValue(element.getValue());
- addElement(element);
+ self.addElement(element);
} else {
existingElement.addValue(element.getValue());
}
+
+ logger.info("result=" + self.detailGroup.toStorageData());
});
+
+ logger.info("after combine, " + self.detailGroup.toStorageData());
}
/**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 15ff40b..2a99df0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
import org.slf4j.*;
/**
@@ -38,11 +39,11 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private final MergeDataCache<Indicator> mergeDataCache;
private int messageNum;
- IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker) {
+ IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
super(workerId);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
- this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index d482b30..1f7b572 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -50,7 +50,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
- this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 64e8f31..90dee0f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -63,7 +63,7 @@ public enum IndicatorProcess {
IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker);
WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
- IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker);
+ IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker);
entryWorkers.put(indicatorClass, aggregateWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index c2cb319..48106ee 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -45,7 +45,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
int bufferSize) {
this.streamDataClassGetter = streamDataClassGetter;
this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
- this.carrier = new DataCarrier<>(channelSize, bufferSize);
+ this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
this.carrier.consume(new RemoteMessageConsumer(), 1);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 98a08b3..13e249a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,20 +20,26 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.*;
import lombok.Setter;
-import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.DataTTL;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.*;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.joda.time.DateTime;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
@@ -51,9 +57,9 @@ public enum DataTTLKeeperTimer {
this.moduleManager = moduleManager;
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
- new RunnableWithExceptionProtection(this::delete,
- t -> logger.error("Remove data in background failure.", t)), 1, 5, TimeUnit.MINUTES);
+// Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+// new RunnableWithExceptionProtection(this::delete,
+// t -> logger.error("Remove data in background failure.", t)), 1, 5, TimeUnit.MINUTES);
}
private void delete() {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
index 2ea9237..8089fe2 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
@@ -33,7 +33,7 @@ public class MeshDataBufferFileCache implements IConsumer<ServiceMeshMetricDataD
public MeshDataBufferFileCache(MeshModuleConfig config) {
this.config = config;
- dataCarrier = new DataCarrier<>(3, 1024);
+ dataCarrier = new DataCarrier<>("MeshDataBufferFileCache", 3, 1024);
}
void start() throws IOException {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 6ddeb6c..35121d0 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.aop.server.receiver.mesh;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
@@ -35,6 +36,8 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -43,6 +46,9 @@ import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
* @author wusheng
*/
public class TelemetryDataDispatcher {
+ private static final Logger logger = LoggerFactory.getLogger(MeshGRPCHandler.class);
+ private static AtomicInteger INDEX = new AtomicInteger(0);
+
private static MeshDataBufferFileCache CACHE;
private static ServiceInventoryCache SERVICE_CACHE;
private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE;
@@ -76,12 +82,16 @@ public class TelemetryDataDispatcher {
static void doDispatch(ServiceMeshMetricDataDecorator decorator) {
ServiceMeshMetric metric = decorator.getMetric();
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime());
- toAll(decorator, minuteTimeBucket);
- toService(decorator, minuteTimeBucket);
+
+ if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) {
+ logger.info("dispatch server: " + INDEX.addAndGet(1));
+ toAll(decorator, minuteTimeBucket);
+ toService(decorator, minuteTimeBucket);
+ toEndpoint(decorator, minuteTimeBucket);
+ }
toServiceRelation(decorator, minuteTimeBucket);
toServiceInstance(decorator, minuteTimeBucket);
toServiceInstanceRelation(decorator, minuteTimeBucket);
- toEndpoint(decorator, minuteTimeBucket);
}
private static void toAll(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {