You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/26 02:58:13 UTC

[incubator-skywalking] branch mesh-receiver updated: Add codes for debug.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch mesh-receiver
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/mesh-receiver by this push:
     new afadbf9  Add codes for debug.
afadbf9 is described below

commit afadbf9816aaf73d8f95f437db428244914da848
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Oct 26 10:58:00 2018 +0800

    Add codes for debug.
---
 .../apm/commons/datacarrier/DataCarrier.java       | 10 +++++++--
 .../commons/datacarrier/consumer/ConsumerPool.java |  8 +++----
 .../datacarrier/consumer/ConsumerPoolTest.java     |  4 ++--
 .../analysis/indicator/ThermodynamicIndicator.java | 17 ++++++++++++--
 .../analysis/worker/IndicatorAggregateWorker.java  |  5 +++--
 .../analysis/worker/IndicatorPersistentWorker.java |  2 +-
 .../core/analysis/worker/IndicatorProcess.java     |  2 +-
 .../core/remote/client/GRPCRemoteClient.java       |  2 +-
 .../core/storage/ttl/DataTTLKeeperTimer.java       | 26 +++++++++++++---------
 .../receiver/mesh/MeshDataBufferFileCache.java     |  2 +-
 .../receiver/mesh/TelemetryDataDispatcher.java     | 16 ++++++++++---
 11 files changed, 65 insertions(+), 29 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index d53fda5..76ad609 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -33,8 +33,14 @@ public class DataCarrier<T> {
     private final int channelSize;
     private Channels<T> channels;
     private ConsumerPool<T> consumerPool;
+    private String name;
 
     public DataCarrier(int channelSize, int bufferSize) {
+        this("default", channelSize, bufferSize);
+    }
+
+    public DataCarrier(String name, int channelSize, int bufferSize) {
+        this.name = name;
         this.bufferSize = bufferSize;
         this.channelSize = channelSize;
         channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
@@ -92,7 +98,7 @@ public class DataCarrier<T> {
         if (consumerPool != null) {
             consumerPool.close();
         }
-        consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num, consumeCycle);
+        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
         consumerPool.begin();
         return this;
     }
@@ -119,7 +125,7 @@ public class DataCarrier<T> {
         if (consumerPool != null) {
             consumerPool.close();
         }
-        consumerPool = new ConsumerPool<T>(this.channels, consumer, num, consumeCycle);
+        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
         consumerPool.begin();
         return this;
     }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
index 814fdf8..7ba0620 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
@@ -32,19 +32,19 @@ public class ConsumerPool<T> {
     private Channels<T> channels;
     private ReentrantLock lock;
 
-    public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
+    public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
         this(channels, num);
         for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
+            consumerThreads[i] = new ConsumerThread("DataCarrier."+ name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
             consumerThreads[i].setDaemon(true);
         }
     }
 
-    public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+    public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
         this(channels, num);
         prototype.init();
         for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype, consumeCycle);
+            consumerThreads[i] = new ConsumerThread("DataCarrier."+ name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
             consumerThreads[i].setDaemon(true);
         }
 
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
index 18eab0e..885a5bf 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
@@ -34,7 +34,7 @@ public class ConsumerPoolTest {
     @Test
     public void testBeginConsumerPool() throws IllegalAccessException {
         Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
-        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
+        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
         pool.begin();
 
         ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
@@ -46,7 +46,7 @@ public class ConsumerPoolTest {
     @Test
     public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
         Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
-        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
+        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
         pool.begin();
 
         Thread.sleep(5000);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
index 23ded0f..cef4342 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/ThermodynamicIndicator.java
@@ -19,9 +19,12 @@
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Thermodynamic indicator represents the calculator for heat map.
@@ -35,6 +38,9 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
  */
 @IndicatorOperator
 public abstract class ThermodynamicIndicator extends Indicator {
+    private static final Logger logger = LoggerFactory.getLogger(ThermodynamicIndicator.class);
+    private static AtomicInteger INDEX = new AtomicInteger(0);
+
     public static final String DETAIL_GROUP = "detail_group";
     public static final String STEP = "step";
     public static final String NUM_OF_STEPS = "num_of_steps";
@@ -57,6 +63,7 @@ public abstract class ThermodynamicIndicator extends Indicator {
      */
     @Entrance
     public final void combine(@SourceFrom int value, @Arg int step, @Arg int maxNumOfSteps) {
+        logger.info("ThermodynamicIndicator get " + INDEX.addAndGet(1));
         if (this.step == 0) {
             this.step = step;
         }
@@ -86,18 +93,24 @@ public abstract class ThermodynamicIndicator extends Indicator {
         ThermodynamicIndicator thermodynamicIndicator = (ThermodynamicIndicator)indicator;
         this.indexCheckAndInit();
         thermodynamicIndicator.indexCheckAndInit();
+        final ThermodynamicIndicator self = this;
 
         thermodynamicIndicator.detailIndex.forEach((key, element) -> {
-            IntKeyLongValue existingElement = this.detailIndex.get(key);
+            IntKeyLongValue existingElement = self.detailIndex.get(key);
+            logger.info("prepare merging id =" + key + ", value=" + element.getValue());
             if (existingElement == null) {
                 existingElement = new IntKeyLongValue();
                 existingElement.setKey(key);
                 existingElement.setValue(element.getValue());
-                addElement(element);
+                self.addElement(element);
             } else {
                 existingElement.addValue(element.getValue());
             }
+
+            logger.info("result=" + self.detailGroup.toStorageData());
         });
+
+        logger.info("after combine, " + self.detailGroup.toStorageData());
     }
 
     /**
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 15ff40b..2a99df0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 import org.slf4j.*;
 
 /**
@@ -38,11 +39,11 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
     private final MergeDataCache<Indicator> mergeDataCache;
     private int messageNum;
 
-    IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker) {
+    IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
         super(workerId);
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
-        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index d482b30..1f7b572 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -50,7 +50,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.mergeDataCache = new MergeDataCache<>();
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
-        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
         this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 64e8f31..90dee0f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -63,7 +63,7 @@ public enum IndicatorProcess {
         IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker);
         WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
 
-        IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker);
+        IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
         WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), aggregateWorker);
 
         entryWorkers.put(indicatorClass, aggregateWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index c2cb319..48106ee 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -45,7 +45,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
         int bufferSize) {
         this.streamDataClassGetter = streamDataClassGetter;
         this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
-        this.carrier = new DataCarrier<>(channelSize, bufferSize);
+        this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
         this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
         this.carrier.consume(new RemoteMessageConsumer(), 1);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 98a08b3..13e249a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -20,20 +20,26 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.*;
 import lombok.Setter;
-import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.DataTTL;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.*;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.Downsampling;
+import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.joda.time.DateTime;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -51,9 +57,9 @@ public enum DataTTLKeeperTimer {
         this.moduleManager = moduleManager;
         this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
 
-        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
-            new RunnableWithExceptionProtection(this::delete,
-                t -> logger.error("Remove data in background failure.", t)), 1, 5, TimeUnit.MINUTES);
+//        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+//            new RunnableWithExceptionProtection(this::delete,
+//                t -> logger.error("Remove data in background failure.", t)), 1, 5, TimeUnit.MINUTES);
     }
 
     private void delete() {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
index 2ea9237..8089fe2 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
@@ -33,7 +33,7 @@ public class MeshDataBufferFileCache implements IConsumer<ServiceMeshMetricDataD
 
     public MeshDataBufferFileCache(MeshModuleConfig config) {
         this.config = config;
-        dataCarrier = new DataCarrier<>(3, 1024);
+        dataCarrier = new DataCarrier<>("MeshDataBufferFileCache", 3, 1024);
     }
 
     void start() throws IOException {
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 6ddeb6c..35121d0 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.aop.server.receiver.mesh;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.skywalking.apm.network.servicemesh.Protocol;
 import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
@@ -35,6 +36,8 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -43,6 +46,9 @@ import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
  * @author wusheng
  */
 public class TelemetryDataDispatcher {
+    private static final Logger logger = LoggerFactory.getLogger(MeshGRPCHandler.class);
+    private static AtomicInteger INDEX = new AtomicInteger(0);
+
     private static MeshDataBufferFileCache CACHE;
     private static ServiceInventoryCache SERVICE_CACHE;
     private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE;
@@ -76,12 +82,16 @@ public class TelemetryDataDispatcher {
     static void doDispatch(ServiceMeshMetricDataDecorator decorator) {
         ServiceMeshMetric metric = decorator.getMetric();
         long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime());
-        toAll(decorator, minuteTimeBucket);
-        toService(decorator, minuteTimeBucket);
+
+        if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) {
+            logger.info("dispatch server: " + INDEX.addAndGet(1));
+            toAll(decorator, minuteTimeBucket);
+            toService(decorator, minuteTimeBucket);
+            toEndpoint(decorator, minuteTimeBucket);
+        }
         toServiceRelation(decorator, minuteTimeBucket);
         toServiceInstance(decorator, minuteTimeBucket);
         toServiceInstanceRelation(decorator, minuteTimeBucket);
-        toEndpoint(decorator, minuteTimeBucket);
     }
 
     private static void toAll(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {